diff --git a/build.gradle b/build.gradle
index 94e521b9a3e17..0b3c570ddf5ca 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1069,6 +1069,7 @@ project(':metadata') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
+ implementation project(':raft')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
@@ -1077,6 +1078,7 @@ project(':metadata') {
testImplementation libs.hamcrest
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
+ testImplementation project(':raft').sourceSets.test.output
}
task processMessages(type:JavaExec) {
@@ -1267,7 +1269,6 @@ project(':raft') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
- implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.jacksonDatabind
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7cf51eb7418d1..27d8f3e0e7979 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -217,12 +217,15 @@
+
+
+
@@ -234,6 +237,8 @@
+
+
@@ -243,6 +248,8 @@
+
+
@@ -292,7 +299,6 @@
-
@@ -418,7 +424,6 @@
-
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index f0d37be8a6837..01d30c6eae029 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -27,7 +27,6 @@ import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.timer.SystemTimer
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
@@ -35,8 +34,9 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
-import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest}
+import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest}
import org.apache.kafka.server.common.serialization.RecordSerde
import scala.jdk.CollectionConverters._
@@ -100,6 +100,10 @@ trait RaftManager[T] {
epoch: Int,
records: Seq[T]
): Option[Long]
+
+ def leaderAndEpoch: LeaderAndEpoch
+
+ def client: RaftClient[T]
}
class KafkaRaftManager[T](
@@ -125,10 +129,10 @@ class KafkaRaftManager[T](
private val dataDir = createDataDir()
private val metadataLog = buildMetadataLog()
private val netChannel = buildNetworkChannel()
- private val raftClient = buildRaftClient()
- private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
+ val client: KafkaRaftClient[T] = buildRaftClient()
+ private val raftIoThread = new RaftIoThread(client, threadNamePrefix)
- def kafkaRaftClient: KafkaRaftClient[T] = raftClient
+ def kafkaRaftClient: KafkaRaftClient[T] = client
def startup(): Unit = {
// Update the voter endpoints (if valid) with what's in RaftConfig
@@ -151,7 +155,7 @@ class KafkaRaftManager[T](
def shutdown(): Unit = {
raftIoThread.shutdown()
- raftClient.close()
+ client.close()
scheduler.shutdown()
netChannel.close()
metadataLog.close()
@@ -160,7 +164,7 @@ class KafkaRaftManager[T](
override def register(
listener: RaftClient.Listener[T]
): Unit = {
- raftClient.register(listener)
+ client.register(listener)
}
override def scheduleAtomicAppend(
@@ -183,9 +187,9 @@ class KafkaRaftManager[T](
isAtomic: Boolean
): Option[Long] = {
val offset = if (isAtomic) {
- raftClient.scheduleAtomicAppend(epoch, records.asJava)
+ client.scheduleAtomicAppend(epoch, records.asJava)
} else {
- raftClient.scheduleAppend(epoch, records.asJava)
+ client.scheduleAppend(epoch, records.asJava)
}
Option(offset).map(Long.unbox)
@@ -202,7 +206,7 @@ class KafkaRaftManager[T](
createdTimeMs
)
- raftClient.handle(inboundRequest)
+ client.handle(inboundRequest)
inboundRequest.completion.thenApply { response =>
response.data
@@ -307,4 +311,7 @@ class KafkaRaftManager[T](
)
}
+ override def leaderAndEpoch: LeaderAndEpoch = {
+ client.leaderAndEpoch
+ }
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 49edbfe295068..08b4653e1684f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -18,9 +18,9 @@
package kafka.server
import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
import java.net.InetAddress
import kafka.cluster.Broker.ServerInfo
@@ -29,6 +29,7 @@ import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinato
import kafka.log.LogManager
import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
+import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
import kafka.utils.{CoreUtils, KafkaScheduler}
@@ -43,10 +44,10 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -55,16 +56,16 @@ import scala.jdk.CollectionConverters._
* A Kafka broker that runs in KRaft (Kafka Raft) mode.
*/
class BrokerServer(
- val config: KafkaConfig,
- val metaProps: MetaProperties,
- val metaLogManager: MetaLogManager,
- val time: Time,
- val metrics: Metrics,
- val threadNamePrefix: Option[String],
- val initialOfflineDirs: Seq[String],
- val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
- val supportedFeatures: util.Map[String, VersionRange]
- ) extends KafkaBroker {
+ val config: KafkaConfig,
+ val metaProps: MetaProperties,
+ val raftManager: RaftManager[ApiMessageAndVersion],
+ val time: Time,
+ val metrics: Metrics,
+ val threadNamePrefix: Option[String],
+ val initialOfflineDirs: Seq[String],
+ val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+ val supportedFeatures: util.Map[String, VersionRange]
+) extends KafkaBroker {
import kafka.server.Server._
@@ -181,7 +182,7 @@ class BrokerServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
- val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
+ val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
clientToControllerChannelManager = BrokerToControllerChannelManager(
controllerNodeProvider,
@@ -284,7 +285,7 @@ class BrokerServer(
metaProps.clusterId, networkListeners, supportedFeatures)
// Register a listener with the Raft layer to receive metadata event notifications
- metaLogManager.register(brokerMetadataListener)
+ raftManager.register(brokerMetadataListener)
val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
var interBrokerListener: Endpoint = null
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 5834a17942286..974cce1ae93f5 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicReference
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.raft.RaftManager
import kafka.utils.Logging
import org.apache.kafka.clients._
import org.apache.kafka.common.Node
@@ -31,9 +32,10 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.metalog.MetaLogManager
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import scala.collection.Seq
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait ControllerNodeProvider {
@@ -77,15 +79,14 @@ class MetadataCacheControllerNodeProvider(
}
object RaftControllerNodeProvider {
- def apply(metaLogManager: MetaLogManager,
+ def apply(raftManager: RaftManager[ApiMessageAndVersion],
config: KafkaConfig,
controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
-
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.listenerSecurityProtocolMap.getOrElse(controllerListenerName, SecurityProtocol.forName(controllerListenerName.value()))
val controllerSaslMechanism = config.saslMechanismControllerProtocol
new RaftControllerNodeProvider(
- metaLogManager,
+ raftManager,
controllerQuorumVoterNodes,
controllerListenerName,
controllerSecurityProtocol,
@@ -98,7 +99,7 @@ object RaftControllerNodeProvider {
* Finds the controller node by checking the metadata log manager.
* This provider is used when we are using a Raft-based metadata quorum.
*/
-class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
+class RaftControllerNodeProvider(val raftManager: RaftManager[ApiMessageAndVersion],
controllerQuorumVoterNodes: Seq[Node],
val listenerName: ListenerName,
val securityProtocol: SecurityProtocol,
@@ -107,14 +108,7 @@ class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
override def get(): Option[Node] = {
- val leader = metaLogManager.leader()
- if (leader == null) {
- None
- } else if (leader.nodeId() < 0) {
- None
- } else {
- idToNode.get(leader.nodeId())
- }
+ raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode)
}
}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 638e23ac84c4c..3f574bc1a4bc3 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -17,9 +17,10 @@
package kafka.server
-import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util
import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{CompletableFuture, TimeUnit}
+
import kafka.cluster.Broker.ServerInfo
import kafka.log.LogConfig
import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
@@ -37,7 +38,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumControllerMetrics}
import org.apache.kafka.metadata.VersionRange
-import org.apache.kafka.metalog.MetaLogManager
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
@@ -49,15 +49,14 @@ import scala.jdk.CollectionConverters._
* A Kafka controller that runs in KRaft (Kafka Raft) mode.
*/
class ControllerServer(
- val metaProperties: MetaProperties,
- val config: KafkaConfig,
- val metaLogManager: MetaLogManager,
- val raftManager: RaftManager[ApiMessageAndVersion],
- val time: Time,
- val metrics: Metrics,
- val threadNamePrefix: Option[String],
- val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
- ) extends Logging with KafkaMetricsGroup {
+ val metaProperties: MetaProperties,
+ val config: KafkaConfig,
+ val raftManager: RaftManager[ApiMessageAndVersion],
+ val time: Time,
+ val metrics: Metrics,
+ val threadNamePrefix: Option[String],
+ val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
+) extends Logging with KafkaMetricsGroup {
import kafka.server.Server._
val lock = new ReentrantLock()
@@ -148,7 +147,7 @@ class ControllerServer(
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
setConfigDefs(configDefs).
- setLogManager(metaLogManager).
+ setRaftClient(raftManager.client).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 1f96eaa7bacf6..8e77357383f9b 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -24,10 +24,10 @@ import kafka.metrics.{KafkaMetricsReporter, KafkaYammerMetrics}
import kafka.raft.KafkaRaftManager
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
-import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.{AppInfoParser, Time}
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.RaftConfig
-import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}
import org.apache.kafka.server.common.ApiMessageAndVersion
import scala.collection.Seq
@@ -55,7 +55,7 @@ class KafkaRaftServer(
private val metrics = Server.initializeMetrics(
config,
time,
- metaProps.clusterId.toString
+ metaProps.clusterId
)
private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
@@ -73,13 +73,11 @@ class KafkaRaftServer(
controllerQuorumVotersFuture
)
- private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
-
private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
Some(new BrokerServer(
config,
metaProps,
- metaLogShim,
+ raftManager,
time,
metrics,
threadNamePrefix,
@@ -95,7 +93,6 @@ class KafkaRaftServer(
Some(new ControllerServer(
metaProps,
config,
- metaLogShim,
raftManager,
time,
metrics,
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 8d07f8ea9fc0f..6dfaa1800406c 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -16,8 +16,8 @@
*/
package kafka.server.metadata
-import java.util
import java.util.concurrent.TimeUnit
+
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.metrics.KafkaMetricsGroup
@@ -27,9 +27,12 @@ import org.apache.kafka.common.metadata.MetadataRecordType._
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.utils.{LogContext, Time}
-import org.apache.kafka.metalog.{MetaLogLeader, MetaLogListener}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.SnapshotReader
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
object BrokerMetadataListener{
@@ -37,16 +40,17 @@ object BrokerMetadataListener{
val MetadataBatchSizes = "MetadataBatchSizes"
}
-class BrokerMetadataListener(brokerId: Int,
- time: Time,
- metadataCache: RaftMetadataCache,
- configRepository: CachedConfigRepository,
- groupCoordinator: GroupCoordinator,
- replicaManager: RaftReplicaManager,
- txnCoordinator: TransactionCoordinator,
- threadNamePrefix: Option[String],
- clientQuotaManager: ClientQuotaMetadataManager
- ) extends MetaLogListener with KafkaMetricsGroup {
+class BrokerMetadataListener(
+ brokerId: Int,
+ time: Time,
+ metadataCache: RaftMetadataCache,
+ configRepository: CachedConfigRepository,
+ groupCoordinator: GroupCoordinator,
+ replicaManager: RaftReplicaManager,
+ txnCoordinator: TransactionCoordinator,
+ threadNamePrefix: Option[String],
+ clientQuotaManager: ClientQuotaMetadataManager
+) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
private val logContext = new LogContext(s"[BrokerMetadataListener id=${brokerId}] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
logIdent = logContext.logPrefix()
@@ -73,21 +77,42 @@ class BrokerMetadataListener(brokerId: Int,
/**
* Handle new metadata records.
*/
- override def handleCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
- eventQueue.append(new HandleCommitsEvent(lastOffset, records))
+ override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = {
+ eventQueue.append(new HandleCommitsEvent(reader))
+ }
+
+ /**
+ * Handle metadata snapshots
+ */
+ override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): Unit = {
+ // Loading snapshot on the broker is currently not supported.
+ reader.close();
+ throw new UnsupportedOperationException(s"Loading snapshot (${reader.snapshotId()}) is not supported")
}
- // Visible for testing. It's useful to execute events synchronously
- private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessage]): Unit = {
- new HandleCommitsEvent(lastOffset, records).run()
+ // Visible for testing. It's useful to execute events synchronously in order
+ // to make tests deterministic. This object is responsible for closing the reader.
+ private[metadata] def execCommits(batchReader: BatchReader[ApiMessageAndVersion]): Unit = {
+ new HandleCommitsEvent(batchReader).run()
}
- class HandleCommitsEvent(lastOffset: Long,
- records: util.List[ApiMessage])
- extends EventQueue.FailureLoggingEvent(log) {
+ class HandleCommitsEvent(
+ reader: BatchReader[ApiMessageAndVersion]
+ ) extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
+ try {
+ apply(reader.next())
+ } finally {
+ reader.close()
+ }
+ }
+
+ private def apply(batch: Batch[ApiMessageAndVersion]): Unit = {
+ val records = batch.records
+ val lastOffset = batch.lastOffset
+
if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: handling ${records.size()} record(s).")
+ debug(s"Metadata batch $lastOffset: handling ${records.size()} record(s).")
}
val imageBuilder =
MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
@@ -100,37 +125,37 @@ class BrokerMetadataListener(brokerId: Int,
trace("Metadata batch %d: processing [%d/%d]: %s.".format(lastOffset, index + 1,
records.size(), record.toString))
}
- handleMessage(imageBuilder, record, lastOffset)
+ handleMessage(imageBuilder, record.message, lastOffset)
} catch {
- case e: Exception => error(s"Unable to handle record ${index} in batch " +
- s"ending at offset ${lastOffset}", e)
+ case e: Exception => error(s"Unable to handle record $index in batch " +
+ s"ending at offset $lastOffset", e)
}
index = index + 1
}
if (imageBuilder.hasChanges) {
val newImage = imageBuilder.build()
if (isTraceEnabled) {
- trace(s"Metadata batch ${lastOffset}: creating new metadata image ${newImage}")
+ trace(s"Metadata batch $lastOffset: creating new metadata image ${newImage}")
} else if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: creating new metadata image")
+ debug(s"Metadata batch $lastOffset: creating new metadata image")
}
metadataCache.image(newImage)
} else if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: no new metadata image required.")
+ debug(s"Metadata batch $lastOffset: no new metadata image required.")
}
if (imageBuilder.hasPartitionChanges) {
if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: applying partition changes")
+ debug(s"Metadata batch $lastOffset: applying partition changes")
}
replicaManager.handleMetadataRecords(imageBuilder, lastOffset,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
} else if (isDebugEnabled) {
- debug(s"Metadata batch ${lastOffset}: no partition changes found.")
+ debug(s"Metadata batch $lastOffset: no partition changes found.")
}
_highestMetadataOffset = lastOffset
val endNs = time.nanoseconds()
val deltaUs = TimeUnit.MICROSECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS)
- debug(s"Metadata batch ${lastOffset}: advanced highest metadata offset in ${deltaUs} " +
+ debug(s"Metadata batch $lastOffset: advanced highest metadata offset in ${deltaUs} " +
"microseconds.")
batchProcessingTimeHist.update(deltaUs)
}
@@ -234,21 +259,17 @@ class BrokerMetadataListener(brokerId: Int,
clientQuotaManager.handleQuotaRecord(record)
}
- class HandleNewLeaderEvent(leader: MetaLogLeader)
+ class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
val imageBuilder =
MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
- if (leader.nodeId() < 0) {
- imageBuilder.controllerId(None)
- } else {
- imageBuilder.controllerId(Some(leader.nodeId()))
- }
+ imageBuilder.controllerId(leaderAndEpoch.leaderId.asScala)
metadataCache.image(imageBuilder.build())
}
}
- override def handleNewLeader(leader: MetaLogLeader): Unit = {
+ override def handleLeaderChange(leader: LeaderAndEpoch): Unit = {
eventQueue.append(new HandleNewLeaderEvent(leader))
}
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 75796ad13f57c..c495dbca5fc79 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.metadata.{MetadataJsonConverters, MetadataRecordT
import org.apache.kafka.common.protocol.ByteBufferAccessor
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.raft.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.MetadataRecordSerde
import scala.jdk.CollectionConverters._
import scala.collection.mutable
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index c21f4b7e86c63..611b9268bb41d 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -19,6 +19,7 @@ package kafka.tools
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
+
import joptsimple.OptionException
import kafka.network.SocketServer
import kafka.raft.{KafkaRaftManager, RaftManager}
@@ -35,7 +36,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
-import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig}
+import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.snapshot.SnapshotReader
@@ -165,12 +166,12 @@ class TestRaftServer(
raftManager.register(this)
- override def handleClaim(epoch: Int): Unit = {
- eventQueue.offer(HandleClaim(epoch))
- }
-
- override def handleResign(epoch: Int): Unit = {
- eventQueue.offer(HandleResign)
+ override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = {
+ if (newLeaderAndEpoch.isLeader(config.nodeId)) {
+ eventQueue.offer(HandleClaim(newLeaderAndEpoch.epoch))
+ } else if (claimedEpoch.isDefined) {
+ eventQueue.offer(HandleResign)
+ }
}
override def handleCommit(reader: BatchReader[Array[Byte]]): Unit = {
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 87083a240c2d0..9c7e4e0b7e089 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -36,11 +36,9 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.raft.RaftConfig;
-import org.apache.kafka.raft.metadata.MetaLogRaftShim;
-import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,11 +173,9 @@ public KafkaClusterTestKit build() throws Exception {
KafkaRaftManager raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
- MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
ControllerServer controller = new ControllerServer(
nodes.controllerProperties(node.id()),
config,
- metaLogShim,
raftManager,
Time.SYSTEM,
new Metrics(),
@@ -228,11 +224,10 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS
KafkaRaftManager raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
- MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
BrokerServer broker = new BrokerServer(
config,
nodes.brokerProperties(node.id()),
- metaLogShim,
+ raftManager,
Time.SYSTEM,
new Metrics(),
Option.apply(threadNamePrefix),
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 545fe48afa544..89ba5f1d03328 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -25,9 +25,11 @@ import kafka.server.RaftReplicaManager
import kafka.utils.Implicits._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
-import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.raft.Batch
+import org.apache.kafka.raft.internals.MemoryBatchReader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers._
@@ -39,6 +41,7 @@ import scala.jdk.CollectionConverters._
class BrokerMetadataListenerTest {
+ private val leaderEpoch = 5
private val brokerId = 1
private val time = new MockTime()
private val configRepository = new CachedConfigRepository
@@ -82,11 +85,10 @@ class BrokerMetadataListenerTest {
): Unit = {
val deleteRecord = new RemoveTopicRecord()
.setTopicId(topicId)
- lastMetadataOffset += 1
- listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessage](
- deleteRecord,
- ).asJava)
+ applyBatch(List[ApiMessageAndVersion](
+ new ApiMessageAndVersion(deleteRecord, 0.toShort),
+ ))
assertFalse(metadataCache.contains(topic))
assertEquals(new Properties, configRepository.topicConfig(topic))
@@ -108,6 +110,25 @@ class BrokerMetadataListenerTest {
assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
}
+ private def applyBatch(
+ records: List[ApiMessageAndVersion]
+ ): Unit = {
+ val baseOffset = lastMetadataOffset + 1
+ lastMetadataOffset += records.size
+ listener.execCommits(
+ new MemoryBatchReader(
+ List(
+ Batch.of(
+ baseOffset,
+ leaderEpoch,
+ records.asJava
+ )
+ ).asJava,
+ reader => ()
+ )
+ )
+ }
+
private def createAndAssert(
topicId: Uuid,
topic: String,
@@ -115,11 +136,10 @@ class BrokerMetadataListenerTest {
numPartitions: Int,
numBrokers: Int
): Set[TopicPartition] = {
- val records = new java.util.ArrayList[ApiMessage]
- records.add(new TopicRecord()
+ val records = mutable.ListBuffer.empty[ApiMessageAndVersion]
+ records += new ApiMessageAndVersion(new TopicRecord()
.setName(topic)
- .setTopicId(topicId)
- )
+ .setTopicId(topicId), 0)
val localTopicPartitions = mutable.Set.empty[TopicPartition]
(0 until numPartitions).map { partitionId =>
@@ -134,28 +154,25 @@ class BrokerMetadataListenerTest {
localTopicPartitions.add(new TopicPartition(topic, partitionId))
}
- records.add(new PartitionRecord()
+ records += new ApiMessageAndVersion(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(partitionId)
.setLeader(preferredLeaderId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
.setReplicas(replicas)
- .setIsr(replicas)
- )
+ .setIsr(replicas), 0)
}
topicConfig.forKeyValue { (key, value) =>
- records.add(new ConfigRecord()
+ records += new ApiMessageAndVersion(new ConfigRecord()
.setResourceName(topic)
.setResourceType(ConfigResource.Type.TOPIC.id())
.setName(key)
- .setValue(value)
- )
+ .setValue(value), 0)
}
- lastMetadataOffset += records.size()
- listener.execCommits(lastOffset = lastMetadataOffset, records)
+ applyBatch(records.toList)
assertTrue(metadataCache.contains(topic))
assertEquals(Some(numPartitions), metadataCache.numPartitions(topic))
assertEquals(topicConfig, configRepository.topicConfig(topic).asScala)
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 329c00f6f0ee5..bcfe75438c2e3 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRe
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.raft.metadata.MetadataRecordSerde
+import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 94cced927f24e..746d9068488d9 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -17,22 +17,6 @@
package org.apache.kafka.controller;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalLong;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigDef;
@@ -80,15 +64,36 @@
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
-import org.apache.kafka.metalog.MetaLogManager;
-import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -120,13 +125,12 @@ static public class Builder {
private String threadNamePrefix = null;
private LogContext logContext = null;
private Map configDefs = Collections.emptyMap();
- private MetaLogManager logManager = null;
+ private RaftClient raftClient = null;
private Map supportedFeatures = Collections.emptyMap();
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
private Function snapshotWriterBuilder;
- private SnapshotReader snapshotReader;
private long sessionTimeoutNs = NANOSECONDS.convert(18, TimeUnit.SECONDS);
private ControllerMetrics controllerMetrics = null;
@@ -154,8 +158,8 @@ public Builder setConfigDefs(Map configDefs) {
return this;
}
- public Builder setLogManager(MetaLogManager logManager) {
- this.logManager = logManager;
+ public Builder setRaftClient(RaftClient logManager) {
+ this.raftClient = logManager;
return this;
}
@@ -184,11 +188,6 @@ public Builder setSnapshotWriterBuilder(Function snapshotW
return this;
}
- public Builder setSnapshotReader(SnapshotReader snapshotReader) {
- this.snapshotReader = snapshotReader;
- return this;
- }
-
public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
this.sessionTimeoutNs = sessionTimeoutNs;
return this;
@@ -201,8 +200,8 @@ public Builder setMetrics(ControllerMetrics controllerMetrics) {
@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
- if (logManager == null) {
- throw new RuntimeException("You must set a metadata log manager.");
+ if (raftClient == null) {
+ throw new RuntimeException("You must set a raft client.");
}
if (threadNamePrefix == null) {
threadNamePrefix = String.format("Node%d_", nodeId);
@@ -217,21 +216,16 @@ public QuorumController build() throws Exception {
if (snapshotWriterBuilder == null) {
snapshotWriterBuilder = new NoOpSnapshotWriterBuilder();
}
- if (snapshotReader == null) {
- snapshotReader = new EmptySnapshotReader(-1);
- }
KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
return new QuorumController(logContext, nodeId, queue, time, configDefs,
- logManager, supportedFeatures, defaultReplicationFactor,
+ raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacer, snapshotWriterBuilder,
- snapshotReader, sessionTimeoutNs, controllerMetrics);
+ sessionTimeoutNs, controllerMetrics);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
throw e;
- } finally {
- Utils.closeQuietly(snapshotReader, "snapshotReader");
}
}
}
@@ -240,12 +234,12 @@ public QuorumController build() throws Exception {
"The active controller appears to be node ";
private NotControllerException newNotControllerException() {
- int latestController = logManager.leader().nodeId();
- if (latestController < 0) {
- return new NotControllerException("No controller appears to be active.");
- } else {
+ OptionalInt latestController = raftClient.leaderAndEpoch().leaderId();
+ if (latestController.isPresent()) {
return new NotControllerException(ACTIVE_CONTROLLER_EXCEPTION_TEXT_PREFIX +
- latestController);
+ latestController.getAsInt());
+ } else {
+ return new NotControllerException("No controller appears to be active.");
}
}
@@ -536,7 +530,7 @@ CompletableFuture future() {
public void run() throws Exception {
long now = time.nanoseconds();
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
- long controllerEpoch = curClaimEpoch;
+ int controllerEpoch = curClaimEpoch;
if (controllerEpoch == -1) {
throw newNotControllerException();
}
@@ -565,19 +559,19 @@ public void run() throws Exception {
} else {
// If the operation returned a batch of records, those records need to be
// written before we can return our result to the user. Here, we hand off
- // the batch of records to the metadata log manager. They will be written
- // out asynchronously.
+ // the batch of records to the raft client. They will be written out
+ // asynchronously.
final long offset;
if (result.isAtomic()) {
- offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records());
+ offset = raftClient.scheduleAtomicAppend(controllerEpoch, result.records());
} else {
- offset = logManager.scheduleWrite(controllerEpoch, result.records());
+ offset = raftClient.scheduleAppend(controllerEpoch, result.records());
}
op.processBatchEndOffset(offset);
writeOffset = offset;
resultAndOffset = ControllerResultAndOffset.of(offset, result);
for (ApiMessageAndVersion message : result.records()) {
- replay(message.message(), -1, offset);
+ replay(message.message(), Optional.empty(), offset);
}
snapshotRegistry.createSnapshot(offset);
log.debug("Read-write operation {} will be completed when the log " +
@@ -623,50 +617,126 @@ private CompletableFuture appendWriteEvent(String name,
return event.future();
}
- class QuorumMetaLogListener implements MetaLogListener {
+ class QuorumMetaLogListener implements RaftClient.Listener {
+
@Override
- public void handleCommits(long offset, List messages) {
- appendControlEvent("handleCommits[" + offset + "]", () -> {
- if (curClaimEpoch == -1) {
- // If the controller is a standby, replay the records that were
- // created by the active controller.
- if (log.isDebugEnabled()) {
- if (log.isTraceEnabled()) {
- log.trace("Replaying commits from the active node up to " +
- "offset {}: {}.", offset, messages.stream().
- map(m -> m.toString()).collect(Collectors.joining(", ")));
+ public void handleCommit(BatchReader reader) {
+ appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> {
+ try {
+ boolean isActiveController = curClaimEpoch != -1;
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ long offset = batch.lastOffset();
+ List messages = batch.records();
+
+ if (isActiveController) {
+ // If the controller is active, the records were already replayed,
+ // so we don't need to do it here.
+ log.debug("Completing purgatory items up to offset {}.", offset);
+
+ // Complete any events in the purgatory that were waiting for this offset.
+ purgatory.completeUpTo(offset);
+
+ // Delete all the in-memory snapshots that we no longer need.
+ // If we are writing a new snapshot, then we need to keep that around;
+ // otherwise, we should delete up to the current committed offset.
+ snapshotRegistry.deleteSnapshotsUpTo(
+ Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));
+
} else {
- log.debug("Replaying commits from the active node up to " +
- "offset {}.", offset);
+ // If the controller is a standby, replay the records that were
+ // created by the active controller.
+ if (log.isDebugEnabled()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Replaying commits from the active node up to " +
+ "offset {}: {}.", offset, messages.stream()
+ .map(ApiMessageAndVersion::toString)
+ .collect(Collectors.joining(", ")));
+ } else {
+ log.debug("Replaying commits from the active node up to " +
+ "offset {}.", offset);
+ }
+ }
+ for (ApiMessageAndVersion messageAndVersion : messages) {
+ replay(messageAndVersion.message(), Optional.empty(), offset);
+ }
}
+ lastCommittedOffset = offset;
}
- for (ApiMessage message : messages) {
- replay(message, -1, offset);
+ } finally {
+ reader.close();
+ }
+ });
+ }
+
+ @Override
+ public void handleSnapshot(SnapshotReader reader) {
+ appendControlEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
+ try {
+ boolean isActiveController = curClaimEpoch != -1;
+ if (isActiveController) {
+ throw new IllegalStateException(
+ String.format(
+ "Asked to load snasphot (%s) when it is the active controller (%s)",
+ reader.snapshotId(),
+ curClaimEpoch
+ )
+ );
}
- } else {
- // If the controller is active, the records were already replayed,
- // so we don't need to do it here.
- log.debug("Completing purgatory items up to offset {}.", offset);
-
- // Complete any events in the purgatory that were waiting for this offset.
- purgatory.completeUpTo(offset);
-
- // Delete all the in-memory snapshots that we no longer need.
- // If we are writing a new snapshot, then we need to keep that around;
- // otherwise, we should delete up to the current committed offset.
- snapshotRegistry.deleteSnapshotsUpTo(
- Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));
+ if (lastCommittedOffset != -1) {
+ throw new IllegalStateException(
+ String.format(
+ "Asked to re-load snapshot (%s) after processing records up to %s",
+ reader.snapshotId(),
+ lastCommittedOffset
+ )
+ );
+ }
+
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ long offset = batch.lastOffset();
+ List messages = batch.records();
+
+ if (log.isDebugEnabled()) {
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Replaying snapshot ({}) batch with last offset of {}: {}",
+ reader.snapshotId(),
+ offset,
+ messages
+ .stream()
+ .map(ApiMessageAndVersion::toString)
+ .collect(Collectors.joining(", "))
+ );
+ } else {
+ log.debug(
+ "Replaying snapshot ({}) batch with last offset of {}",
+ reader.snapshotId(),
+ offset
+ );
+ }
+ }
+
+ for (ApiMessageAndVersion messageAndVersion : messages) {
+ replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset);
+ }
+ }
+
+ lastCommittedOffset = reader.snapshotId().offset - 1;
+ snapshotRegistry.createSnapshot(lastCommittedOffset);
+ } finally {
+ reader.close();
}
- lastCommittedOffset = offset;
});
}
@Override
- public void handleNewLeader(MetaLogLeader newLeader) {
- if (newLeader.nodeId() == nodeId) {
- final long newEpoch = newLeader.epoch();
+ public void handleLeaderChange(LeaderAndEpoch newLeader) {
+ if (newLeader.isLeader(nodeId)) {
+ final int newEpoch = newLeader.epoch();
appendControlEvent("handleClaim[" + newEpoch + "]", () -> {
- long curEpoch = curClaimEpoch;
+ int curEpoch = curClaimEpoch;
if (curEpoch != -1) {
throw new RuntimeException("Tried to claim controller epoch " +
newEpoch + ", but we never renounced controller epoch " +
@@ -678,19 +748,14 @@ public void handleNewLeader(MetaLogLeader newLeader) {
writeOffset = lastCommittedOffset;
clusterControl.activate();
});
- }
- }
-
- @Override
- public void handleRenounce(long oldEpoch) {
- appendControlEvent("handleRenounce[" + oldEpoch + "]", () -> {
- if (curClaimEpoch == oldEpoch) {
+ } else if (curClaimEpoch != -1) {
+ appendControlEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
- "log event. Reverting to last committed offset {}.", curClaimEpoch,
- lastCommittedOffset);
+ "log event. Reverting to last committed offset {}.", curClaimEpoch,
+ lastCommittedOffset);
renounce();
- }
- });
+ });
+ }
}
@Override
@@ -753,7 +818,7 @@ private void cancelMaybeFenceReplicas() {
}
@SuppressWarnings("unchecked")
- private void replay(ApiMessage message, long snapshotEpoch, long offset) {
+ private void replay(ApiMessage message, Optional snapshotId, long offset) {
try {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
@@ -794,12 +859,12 @@ private void replay(ApiMessage message, long snapshotEpoch, long offset) {
throw new RuntimeException("Unhandled record type " + type);
}
} catch (Exception e) {
- if (snapshotEpoch < 0) {
- log.error("Error replaying record {} at offset {}.",
- message.toString(), offset, e);
+ if (snapshotId.isPresent()) {
+ log.error("Error replaying record {} from snapshot {} at last offset {}.",
+ message.toString(), snapshotId.get(), offset, e);
} else {
- log.error("Error replaying record {} from snapshot {} at index {}.",
- message.toString(), snapshotEpoch, offset, e);
+ log.error("Error replaying record {} at last offset {}.",
+ message.toString(), offset, e);
}
}
}
@@ -878,7 +943,7 @@ private void replay(ApiMessage message, long snapshotEpoch, long offset) {
/**
* The interface that we use to mutate the Raft log.
*/
- private final MetaLogManager logManager;
+ private final RaftClient raftClient;
/**
* The interface that receives callbacks from the Raft log. These callbacks are
@@ -891,7 +956,7 @@ private void replay(ApiMessage message, long snapshotEpoch, long offset) {
* Otherwise, this is -1. This variable must be modified only from the controller
* thread, but it can be read from other threads.
*/
- private volatile long curClaimEpoch;
+ private volatile int curClaimEpoch;
/**
* The last offset we have committed, or -1 if we have not committed any offsets.
@@ -908,15 +973,14 @@ private QuorumController(LogContext logContext,
KafkaEventQueue queue,
Time time,
Map configDefs,
- MetaLogManager logManager,
+ RaftClient raftClient,
Map supportedFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
ReplicaPlacer replicaPlacer,
Function snapshotWriterBuilder,
- SnapshotReader snapshotReader,
long sessionTimeoutNs,
- ControllerMetrics controllerMetrics) throws Exception {
+ ControllerMetrics controllerMetrics) {
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
@@ -935,21 +999,14 @@ private QuorumController(LogContext logContext,
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
logContext, defaultReplicationFactor, defaultNumPartitions,
configurationControl, clusterControl, controllerMetrics);
- this.logManager = logManager;
+ this.raftClient = raftClient;
this.metaLogListener = new QuorumMetaLogListener();
- this.curClaimEpoch = -1L;
- this.lastCommittedOffset = snapshotReader.epoch();
+ this.curClaimEpoch = -1;
+ this.lastCommittedOffset = -1L;
this.writeOffset = -1L;
- while (snapshotReader.hasNext()) {
- List batch = snapshotReader.next();
- long index = 0;
- for (ApiMessage message : batch) {
- replay(message, snapshotReader.epoch(), index++);
- }
- }
snapshotRegistry.createSnapshot(lastCommittedOffset);
- this.logManager.register(metaLogListener);
+ this.raftClient.register(metaLogListener);
}
@Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
similarity index 96%
rename from raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java
rename to metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
index 123de4c4853d6..4a4d0ee9981c4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.metadata;
+package org.apache.kafka.metadata;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.ApiMessage;
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java
deleted file mode 100644
index 2bf4f7c718bd5..0000000000000
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metalog;
-
-import java.util.Objects;
-
-/**
- * The current leader of the MetaLog.
- */
-public class MetaLogLeader {
- private final int nodeId;
- private final long epoch;
-
- public MetaLogLeader(int nodeId, long epoch) {
- this.nodeId = nodeId;
- this.epoch = epoch;
- }
-
- public int nodeId() {
- return nodeId;
- }
-
- public long epoch() {
- return epoch;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof MetaLogLeader)) return false;
- MetaLogLeader other = (MetaLogLeader) o;
- return other.nodeId == nodeId && other.epoch == epoch;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(nodeId, epoch);
- }
-
- @Override
- public String toString() {
- return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")";
- }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java
deleted file mode 100644
index 93744202dc90f..0000000000000
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metalog;
-
-import org.apache.kafka.common.protocol.ApiMessage;
-
-import java.util.List;
-
-/**
- * Listeners receive notifications from the MetaLogManager.
- */
-public interface MetaLogListener {
- /**
- * Called when the MetaLogManager commits some messages.
- *
- * @param lastOffset The last offset found in all the given messages.
- * @param messages The messages.
- */
- void handleCommits(long lastOffset, List messages);
-
- /**
- * Called when a new leader is elected.
- *
- * @param leader The new leader id and epoch.
- */
- default void handleNewLeader(MetaLogLeader leader) {}
-
- /**
- * Called when the MetaLogManager has renounced the leadership.
- *
- * @param epoch The controller epoch that has ended.
- */
- default void handleRenounce(long epoch) {}
-
- /**
- * Called when the MetaLogManager has finished shutting down, and wants to tell its
- * listener that it is safe to shut down as well.
- */
- default void beginShutdown() {}
-}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
deleted file mode 100644
index 1ba358c4f06c2..0000000000000
--- a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metalog;
-
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-
-import java.util.List;
-
-/**
- * The MetaLogManager handles storing metadata and electing leaders.
- */
-public interface MetaLogManager {
-
- /**
- * Start this meta log manager.
- * The manager must be ready to accept incoming calls after this function returns.
- * It is an error to initialize a MetaLogManager more than once.
- */
- void initialize() throws Exception;
-
- /**
- * Register the listener. The manager must be initialized already.
- * The listener must be ready to accept incoming calls immediately.
- *
- * @param listener The listener to register.
- */
- void register(MetaLogListener listener) throws Exception;
-
- /**
- * Schedule a write to the log.
- *
- * The write will be scheduled to happen at some time in the future. There is no
- * error return or exception thrown if the write fails. Instead, the listener may
- * regard the write as successful if and only if the MetaLogManager reaches the given
- * offset before renouncing its leadership. The listener should determine this by
- * monitoring the committed offsets.
- *
- * @param epoch the controller epoch
- * @param batch the batch of messages to write
- *
- * @return the offset of the last message in the batch
- * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
- */
- long scheduleWrite(long epoch, List batch);
-
- /**
- * Schedule a atomic write to the log.
- *
- * The write will be scheduled to happen at some time in the future. All of the messages in batch
- * will be appended atomically in one batch. The listener may regard the write as successful
- * if and only if the MetaLogManager reaches the given offset before renouncing its leadership.
- * The listener should determine this by monitoring the committed offsets.
- *
- * @param epoch the controller epoch
- * @param batch the batch of messages to write
- *
- * @return the offset of the last message in the batch
- * @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
- */
- long scheduleAtomicWrite(long epoch, List batch);
-
- /**
- * Renounce the leadership.
- *
- * @param epoch The epoch. If this does not match the current epoch, this
- * call will be ignored.
- */
- void renounce(long epoch);
-
- /**
- * Returns the current leader. The active node may change immediately after this
- * function is called, of course.
- */
- MetaLogLeader leader();
-
- /**
- * Returns the node id.
- */
- int nodeId();
-
-}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java b/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java
index de481a0f7f935..53268c8ec4941 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockSnapshotWriter.java
@@ -17,13 +17,20 @@
package org.apache.kafka.controller;
-import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.metadata.MetadataRecordSerde;
+import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.MockRawSnapshotReader;
+import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
+import java.util.concurrent.atomic.AtomicReference;
+import java.nio.ByteBuffer;
class MockSnapshotWriter implements SnapshotWriter {
private final long epoch;
@@ -80,15 +87,24 @@ synchronized List> batches() {
return batches;
}
- public MockSnapshotReader toReader() {
- List> readerBatches = new ArrayList<>();
- for (List batch : batches) {
- List readerBatch = new ArrayList<>();
- for (ApiMessageAndVersion messageAndVersion : batch) {
- readerBatch.add(messageAndVersion.message());
- }
- readerBatches.add(readerBatch);
+ public MockRawSnapshotReader toReader() {
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(epoch, 0);
+ AtomicReference buffer = new AtomicReference<>();
+ int maxBufferSize = 1024;
+ try (org.apache.kafka.snapshot.SnapshotWriter snapshotWriter =
+ new org.apache.kafka.snapshot.SnapshotWriter<>(
+ new MockRawSnapshotWriter(snapshotId, buffer::set),
+ maxBufferSize,
+ MemoryPool.NONE,
+ new MockTime(),
+ CompressionType.NONE,
+ new MetadataRecordSerde()
+ )
+ ) {
+ batches.forEach(snapshotWriter::append);
+ snapshotWriter.freeze();
}
- return new MockSnapshotReader(epoch, readerBatches.iterator());
+
+ return new MockRawSnapshotReader(snapshotId, buffer.get());
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 83ab5f9f3a2c0..c6114dee0891b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -33,8 +34,8 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
@@ -53,18 +54,18 @@
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.metadata.PartitionRecord;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@@ -91,7 +92,7 @@ public class QuorumControllerTest {
*/
@Test
public void testCreateAndClose() throws Throwable {
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, __ -> { })) {
}
@@ -103,7 +104,7 @@ public void testCreateAndClose() throws Throwable {
*/
@Test
public void testConfigurationOperations() throws Throwable {
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
testConfigurationOperations(controlEnv.activeController());
@@ -134,7 +135,7 @@ private void testConfigurationOperations(QuorumController controller) throws Thr
*/
@Test
public void testDelayedConfigurationOperations() throws Throwable {
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@@ -160,7 +161,7 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
@Test
public void testUnregisterBroker() throws Throwable {
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
ListenerCollection listeners = new ListenerCollection();
@@ -229,7 +230,7 @@ public void testSnapshotSaveAndLoad() throws Throwable {
MockSnapshotWriter writer = null;
Map brokerEpochs = new HashMap<>();
Uuid fooId;
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3)) {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS).
setSnapshotWriterBuilder(snapshotWriterBuilder))) {
@@ -272,11 +273,11 @@ public void testSnapshotSaveAndLoad() throws Throwable {
writer.waitForCompletion();
checkSnapshotContents(fooId, brokerEpochs, writer.batches().iterator());
}
+ }
- final MockSnapshotReader reader = writer.toReader();
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.of(writer.toReader()))) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS).
- setSnapshotReader(reader).
setSnapshotWriterBuilder(snapshotWriterBuilder))) {
QuorumController active = controlEnv.activeController();
long snapshotEpoch = active.beginWritingSnapshot().get();
@@ -347,7 +348,7 @@ private void checkSnapshotContents(Uuid fooId,
*/
@Test
public void testTimeouts() throws Throwable {
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
QuorumController controller = controlEnv.activeController();
@@ -403,7 +404,7 @@ private static void assertYieldsTimeout(Future> future) {
*/
@Test
public void testEarlyControllerResults() throws Throwable {
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1)) {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
try (QuorumControllerTestEnv controlEnv =
new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))) {
QuorumController controller = controlEnv.activeController();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 99270422fcf2c..db3acfba2a72f 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -41,7 +41,7 @@ public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
try {
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i);
- builder.setLogManager(logEnv.logManagers().get(i));
+ builder.setRaftClient(logEnv.logManagers().get(i));
builderConsumer.accept(builder);
this.controllers.add(builder.build());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
similarity index 98%
rename from raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java
rename to metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
index de844e6e75ffc..77906e7141380 100644
--- a/raft/src/test/java/org/apache/kafka/raft/metadata/MetadataRecordSerdeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.metadata;
+package org.apache.kafka.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.SerializationException;
@@ -69,4 +69,4 @@ public void testDeserializeWithUnhandledFrameVersion() {
() -> serde.read(new ByteBufferAccessor(buffer), 16));
}
-}
\ No newline at end of file
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 6442bb9dbaad7..655b07c033358 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -17,22 +17,34 @@
package org.apache.kafka.metalog;
-import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.raft.internals.MemoryBatchReader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -43,15 +55,15 @@
/**
* The LocalLogManager is a test implementation that relies on the contents of memory.
*/
-public final class LocalLogManager implements MetaLogManager, AutoCloseable {
+public final class LocalLogManager implements RaftClient, AutoCloseable {
interface LocalBatch {
int size();
}
static class LeaderChangeBatch implements LocalBatch {
- private final MetaLogLeader newLeader;
+ private final LeaderAndEpoch newLeader;
- LeaderChangeBatch(MetaLogLeader newLeader) {
+ LeaderChangeBatch(LeaderAndEpoch newLeader) {
this.newLeader = newLeader;
}
@@ -80,9 +92,11 @@ public String toString() {
}
static class LocalRecordBatch implements LocalBatch {
- private final List records;
+ private final long leaderEpoch;
+ private final List records;
- LocalRecordBatch(List records) {
+ LocalRecordBatch(long leaderEpoch, List records) {
+ this.leaderEpoch = leaderEpoch;
this.records = records;
}
@@ -126,16 +140,27 @@ public static class SharedLogData {
/**
* The current leader.
*/
- private MetaLogLeader leader = new MetaLogLeader(-1, -1);
+ private LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
/**
* The start offset of the last batch that was created, or -1 if no batches have
* been created.
*/
- private long prevOffset = -1;
+ private long prevOffset;
+
+ private final Optional snapshot;
+
+ public SharedLogData(Optional snapshot) {
+ this.snapshot = snapshot;
+ if (snapshot.isPresent()) {
+ prevOffset = snapshot.get().snapshotId().offset - 1;
+ } else {
+ prevOffset = -1;
+ }
+ }
synchronized void registerLogManager(LocalLogManager logManager) {
- if (logManagers.put(logManager.nodeId(), logManager) != null) {
+ if (logManagers.put(logManager.nodeId, logManager) != null) {
throw new RuntimeException("Can't have multiple LocalLogManagers " +
"with id " + logManager.nodeId());
}
@@ -143,7 +168,7 @@ synchronized void registerLogManager(LocalLogManager logManager) {
}
synchronized void unregisterLogManager(LocalLogManager logManager) {
- if (!logManagers.remove(logManager.nodeId(), logManager)) {
+ if (!logManagers.remove(logManager.nodeId, logManager)) {
throw new RuntimeException("Log manager " + logManager.nodeId() +
" was not found.");
}
@@ -155,9 +180,9 @@ synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
"match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
return Long.MAX_VALUE;
}
- if (nodeId != leader.nodeId()) {
+ if (!leader.isLeader(nodeId)) {
log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
- "match the current leader id of {}.", nodeId, epoch, leader.nodeId());
+ "match the current leader id of {}.", nodeId, epoch, leader.leaderId());
return Long.MAX_VALUE;
}
log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
@@ -181,7 +206,7 @@ synchronized long append(LocalBatch batch) {
}
synchronized void electLeaderIfNeeded() {
- if (leader.nodeId() != -1 || logManagers.isEmpty()) {
+ if (leader.leaderId().isPresent() || logManagers.isEmpty()) {
return;
}
int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
@@ -190,7 +215,7 @@ synchronized void electLeaderIfNeeded() {
for (int i = 0; i <= nextLeaderIndex; i++) {
nextLeaderNode = iter.next();
}
- MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
+ LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch() + 1);
log.info("Elected new leader: {}.", newLeader);
append(new LeaderChangeBatch(newLeader));
}
@@ -202,13 +227,26 @@ synchronized Entry nextBatch(long offset) {
}
return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
}
+
+ /**
+ * Optionally return a snapshot reader if the offset if less than the first batch.
+ */
+ Optional maybeNextSnapshot(long offset) {
+ return snapshot.flatMap(reader -> {
+ if (offset < reader.snapshotId().offset) {
+ return Optional.of(reader);
+ }
+
+ return Optional.empty();
+ });
+ }
}
private static class MetaLogListenerData {
private long offset = -1;
- private final MetaLogListener listener;
+ private final RaftClient.Listener listener;
- MetaLogListenerData(MetaLogListener listener) {
+ MetaLogListenerData(RaftClient.Listener listener) {
this.listener = listener;
}
}
@@ -254,7 +292,7 @@ private static class MetaLogListenerData {
/**
* The current leader, as seen by this log manager.
*/
- private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
+ private volatile LeaderAndEpoch leader = new LeaderAndEpoch(OptionalInt.empty(), 0);
public LocalLogManager(LogContext logContext,
int nodeId,
@@ -274,6 +312,21 @@ private void scheduleLogCheck() {
int numEntriesFound = 0;
for (MetaLogListenerData listenerData : listeners) {
while (true) {
+ // Load the snapshot if needed
+ Optional snapshot = shared.maybeNextSnapshot(listenerData.offset);
+ if (snapshot.isPresent()) {
+ log.trace("Node {}: handling snapshot with id {}.", nodeId, snapshot.get().snapshotId());
+ listenerData.listener.handleSnapshot(
+ SnapshotReader.of(
+ snapshot.get(),
+ new MetadataRecordSerde(),
+ BufferSupplier.create(),
+ Integer.MAX_VALUE
+ )
+ );
+ listenerData.offset = snapshot.get().snapshotId().offset - 1;
+ }
+
Entry entry = shared.nextBatch(listenerData.offset);
if (entry == null) {
log.trace("Node {}: reached the end of the log after finding " +
@@ -291,7 +344,7 @@ private void scheduleLogCheck() {
LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
log.trace("Node {}: handling LeaderChange to {}.",
nodeId, batch.newLeader);
- listenerData.listener.handleNewLeader(batch.newLeader);
+ listenerData.listener.handleLeaderChange(batch.newLeader);
if (batch.newLeader.epoch() > leader.epoch()) {
leader = batch.newLeader;
}
@@ -299,7 +352,18 @@ private void scheduleLogCheck() {
LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
log.trace("Node {}: handling LocalRecordBatch with offset {}.",
nodeId, entryOffset);
- listenerData.listener.handleCommits(entryOffset, batch.records);
+ listenerData.listener.handleCommit(
+ new MemoryBatchReader<>(
+ Collections.singletonList(
+ Batch.of(
+ entryOffset - batch.records.size() + 1,
+ Math.toIntExact(batch.leaderEpoch),
+ batch.records
+ )
+ ),
+ reader -> { }
+ )
+ );
}
numEntriesFound++;
listenerData.offset = entryOffset;
@@ -317,7 +381,7 @@ public void beginShutdown() {
try {
if (initialized && !shutdown) {
log.debug("Node {}: beginning shutdown.", nodeId);
- renounce(leader.epoch());
+ resign(leader.epoch());
for (MetaLogListenerData listenerData : listeners) {
listenerData.listener.beginShutdown();
}
@@ -331,14 +395,38 @@ public void beginShutdown() {
}
@Override
- public void close() throws InterruptedException {
+ public void close() {
log.debug("Node {}: closing.", nodeId);
beginShutdown();
- eventQueue.close();
+
+ try {
+ eventQueue.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Shutdown the log manager.
+ *
+ * Even though the API suggests a non-blocking shutdown, this method always returns a completed
+ * future. This means that shutdown is a blocking operation.
+ */
+ @Override
+ public CompletableFuture shutdown(int timeoutMs) {
+ CompletableFuture shutdownFuture = new CompletableFuture<>();
+ try {
+ close();
+ shutdownFuture.complete(null);
+ } catch (Throwable t) {
+ shutdownFuture.completeExceptionally(t);
+ }
+ return shutdownFuture;
}
@Override
- public void initialize() throws Exception {
+ public void initialize() {
eventQueue.append(() -> {
log.debug("initialized local log manager for node " + nodeId);
initialized = true;
@@ -346,7 +434,7 @@ public void initialize() throws Exception {
}
@Override
- public void register(MetaLogListener listener) throws Exception {
+ public void register(RaftClient.Listener listener) {
CompletableFuture future = new CompletableFuture<>();
eventQueue.append(() -> {
if (shutdown) {
@@ -366,47 +454,54 @@ public void register(MetaLogListener listener) throws Exception {
"LocalLogManager was not initialized."));
}
});
- future.get();
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
- public long scheduleWrite(long epoch, List batch) {
- return scheduleAtomicWrite(epoch, batch);
+ public Long scheduleAppend(int epoch, List batch) {
+ return scheduleAtomicAppend(epoch, batch);
}
@Override
- public long scheduleAtomicWrite(long epoch, List batch) {
+ public Long scheduleAtomicAppend(int epoch, List batch) {
return shared.tryAppend(
nodeId,
leader.epoch(),
- new LocalRecordBatch(
- batch
- .stream()
- .map(ApiMessageAndVersion::message)
- .collect(Collectors.toList())
- )
+ new LocalRecordBatch(leader.epoch(), batch)
);
}
@Override
- public void renounce(long epoch) {
- MetaLogLeader curLeader = leader;
- MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
+ public void resign(int epoch) {
+ LeaderAndEpoch curLeader = leader;
+ LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch() + 1);
shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
}
@Override
- public MetaLogLeader leader() {
+ public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LeaderAndEpoch leaderAndEpoch() {
return leader;
}
@Override
- public int nodeId() {
- return nodeId;
+ public OptionalInt nodeId() {
+ return OptionalInt.of(nodeId);
}
- public List listeners() {
- final CompletableFuture> future = new CompletableFuture<>();
+ public List> listeners() {
+ final CompletableFuture>> future = new CompletableFuture<>();
eventQueue.append(() -> {
future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
});
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index e13ebfe953526..4d4e5101da994 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -18,15 +18,16 @@
package org.apache.kafka.metalog;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
import java.util.stream.Collectors;
import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
@@ -37,7 +38,6 @@
@Timeout(value = 40)
public class LocalLogManagerTest {
- private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
/**
* Test creating a LocalLogManager and closing it.
@@ -45,7 +45,7 @@ public class LocalLogManagerTest {
@Test
public void testCreateAndClose() throws Exception {
try (LocalLogManagerTestEnv env =
- LocalLogManagerTestEnv.createWithMockListeners(1)) {
+ LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) {
env.close();
assertEquals(null, env.firstError.get());
}
@@ -57,8 +57,8 @@ public void testCreateAndClose() throws Exception {
@Test
public void testClaimsLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
- LocalLogManagerTestEnv.createWithMockListeners(1)) {
- assertEquals(new MetaLogLeader(0, 0), env.waitForLeader());
+ LocalLogManagerTestEnv.createWithMockListeners(1, Optional.empty())) {
+ assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader());
env.close();
assertEquals(null, env.firstError.get());
}
@@ -70,12 +70,16 @@ public void testClaimsLeadership() throws Exception {
@Test
public void testPassLeadership() throws Exception {
try (LocalLogManagerTestEnv env =
- LocalLogManagerTestEnv.createWithMockListeners(3)) {
- MetaLogLeader first = env.waitForLeader();
- MetaLogLeader cur = first;
+ LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) {
+ LeaderAndEpoch first = env.waitForLeader();
+ LeaderAndEpoch cur = first;
do {
- env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
- MetaLogLeader next = env.waitForLeader();
+ int currentLeaderId = cur.leaderId().orElseThrow(() ->
+ new AssertionError("Current leader is undefined")
+ );
+ env.logManagers().get(currentLeaderId).resign(cur.epoch());
+
+ LeaderAndEpoch next = env.waitForLeader();
while (next.epoch() == cur.epoch()) {
Thread.sleep(1);
next = env.waitForLeader();
@@ -84,7 +88,7 @@ public void testPassLeadership() throws Exception {
assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch +
", but found " + next);
cur = next;
- } while (cur.nodeId() == first.nodeId());
+ } while (cur.leaderId().equals(first.leaderId()));
env.close();
assertEquals(null, env.firstError.get());
}
@@ -120,15 +124,19 @@ private static void waitForLastCommittedOffset(long targetOffset,
@Test
public void testCommits() throws Exception {
try (LocalLogManagerTestEnv env =
- LocalLogManagerTestEnv.createWithMockListeners(3)) {
- MetaLogLeader leaderInfo = env.waitForLeader();
- LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
- long epoch = activeLogManager.leader().epoch();
+ LocalLogManagerTestEnv.createWithMockListeners(3, Optional.empty())) {
+ LeaderAndEpoch leaderInfo = env.waitForLeader();
+ int leaderId = leaderInfo.leaderId().orElseThrow(() ->
+ new AssertionError("Current leader is undefined")
+ );
+
+ LocalLogManager activeLogManager = env.logManagers().get(leaderId);
+ int epoch = activeLogManager.leaderAndEpoch().epoch();
List messages = Arrays.asList(
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0),
new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0));
- assertEquals(3, activeLogManager.scheduleWrite(epoch, messages));
+ assertEquals(3, activeLogManager.scheduleAppend(epoch, messages));
for (LocalLogManager logManager : env.logManagers()) {
waitForLastCommittedOffset(3, logManager);
}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 52aeea052bdde..9282f42237d66 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -20,6 +20,8 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +30,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
public class LocalLogManagerTestEnv implements AutoCloseable {
@@ -55,11 +58,14 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
*/
private final List logManagers;
- public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) throws Exception {
- LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers);
+ public static LocalLogManagerTestEnv createWithMockListeners(
+ int numManagers,
+ Optional snapshot
+ ) throws Exception {
+ LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers, snapshot);
try {
for (LocalLogManager logManager : testEnv.logManagers) {
- logManager.register(new MockMetaLogManagerListener());
+ logManager.register(new MockMetaLogManagerListener(logManager.nodeId().getAsInt()));
}
} catch (Exception e) {
testEnv.close();
@@ -68,9 +74,9 @@ public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) th
return testEnv;
}
- public LocalLogManagerTestEnv(int numManagers) throws Exception {
+ public LocalLogManagerTestEnv(int numManagers, Optional snapshot) throws Exception {
dir = TestUtils.tempDirectory();
- shared = new SharedLogData();
+ shared = new SharedLogData(snapshot);
List newLogManagers = new ArrayList<>(numManagers);
try {
for (int nodeId = 0; nodeId < numManagers; nodeId++) {
@@ -100,16 +106,17 @@ File dir() {
return dir;
}
- MetaLogLeader waitForLeader() throws InterruptedException {
- AtomicReference value = new AtomicReference<>(null);
+ LeaderAndEpoch waitForLeader() throws InterruptedException {
+ AtomicReference value = new AtomicReference<>(null);
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
- MetaLogLeader result = null;
+ LeaderAndEpoch result = null;
for (LocalLogManager logManager : logManagers) {
- MetaLogLeader leader = logManager.leader();
- if (leader.nodeId() == logManager.nodeId()) {
+ LeaderAndEpoch leader = logManager.leaderAndEpoch();
+ int nodeId = logManager.nodeId().getAsInt();
+ if (leader.isLeader(nodeId)) {
if (result != null) {
- throw new RuntimeException("node " + leader.nodeId() +
- " thinks it's the leader, but so does " + result.nodeId());
+ throw new RuntimeException("node " + nodeId +
+ " thinks it's the leader, but so does " + result.leaderId());
}
result = leader;
}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
index fe61ec070285b..3195a634e8083 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -18,46 +18,90 @@
package org.apache.kafka.metalog;
import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import java.util.ArrayList;
import java.util.List;
+import java.util.OptionalInt;
-public class MockMetaLogManagerListener implements MetaLogListener {
+public class MockMetaLogManagerListener implements RaftClient.Listener {
public static final String COMMIT = "COMMIT";
public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
public static final String NEW_LEADER = "NEW_LEADER";
public static final String RENOUNCE = "RENOUNCE";
public static final String SHUTDOWN = "SHUTDOWN";
+ public static final String SNAPSHOT = "SNAPSHOT";
+ private final int nodeId;
private final List serializedEvents = new ArrayList<>();
+ private LeaderAndEpoch leaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);
+
+ public MockMetaLogManagerListener(int nodeId) {
+ this.nodeId = nodeId;
+ }
@Override
- public synchronized void handleCommits(long lastCommittedOffset, List messages) {
- for (ApiMessage message : messages) {
- StringBuilder bld = new StringBuilder();
- bld.append(COMMIT).append(" ").append(message.toString());
- serializedEvents.add(bld.toString());
+ public synchronized void handleCommit(BatchReader reader) {
+ try {
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ long lastCommittedOffset = batch.lastOffset();
+
+ for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+ ApiMessage message = messageAndVersion.message();
+ StringBuilder bld = new StringBuilder();
+ bld.append(COMMIT).append(" ").append(message.toString());
+ serializedEvents.add(bld.toString());
+ }
+ StringBuilder bld = new StringBuilder();
+ bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
+ serializedEvents.add(bld.toString());
+ }
+ } finally {
+ reader.close();
}
- StringBuilder bld = new StringBuilder();
- bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
- serializedEvents.add(bld.toString());
}
@Override
- public void handleNewLeader(MetaLogLeader leader) {
- StringBuilder bld = new StringBuilder();
- bld.append(NEW_LEADER).append(" ").
- append(leader.nodeId()).append(" ").append(leader.epoch());
- synchronized (this) {
- serializedEvents.add(bld.toString());
+ public synchronized void handleSnapshot(SnapshotReader reader) {
+ long lastCommittedOffset = reader.snapshotId().offset - 1;
+ try {
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+
+ for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+ ApiMessage message = messageAndVersion.message();
+ StringBuilder bld = new StringBuilder();
+ bld.append(SNAPSHOT).append(" ").append(message.toString());
+ serializedEvents.add(bld.toString());
+ }
+ StringBuilder bld = new StringBuilder();
+ bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
+ serializedEvents.add(bld.toString());
+ }
+ } finally {
+ reader.close();
}
}
@Override
- public void handleRenounce(long epoch) {
- StringBuilder bld = new StringBuilder();
- bld.append(RENOUNCE).append(" ").append(epoch);
- synchronized (this) {
+ public synchronized void handleLeaderChange(LeaderAndEpoch newLeaderAndEpoch) {
+ LeaderAndEpoch oldLeaderAndEpoch = this.leaderAndEpoch;
+ this.leaderAndEpoch = newLeaderAndEpoch;
+
+ if (newLeaderAndEpoch.isLeader(nodeId)) {
+ StringBuilder bld = new StringBuilder();
+ bld.append(NEW_LEADER).append(" ").
+ append(nodeId).append(" ").append(newLeaderAndEpoch.epoch());
+ serializedEvents.add(bld.toString());
+ } else if (oldLeaderAndEpoch.isLeader(nodeId)) {
+ StringBuilder bld = new StringBuilder();
+ bld.append(RENOUNCE).append(" ").append(newLeaderAndEpoch.epoch());
serializedEvents.add(bld.toString());
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index f1a928d55c714..e0e7cb449357e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -38,8 +38,6 @@
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FetchResponse;
-import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
@@ -51,10 +49,12 @@
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -358,17 +358,15 @@ private void maybeFireHandleCommit(long baseOffset, int epoch, List records)
}
}
- private void maybeFireHandleClaim(LeaderState state) {
- int leaderEpoch = state.epoch();
- long epochStartOffset = state.epochStartOffset();
+ private void maybeFireLeaderChange(LeaderState state) {
for (ListenerContext listenerContext : listenerContexts) {
- listenerContext.maybeFireHandleClaim(leaderEpoch, epochStartOffset);
+ listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch(), state.epochStartOffset());
}
}
- private void fireHandleResign(int epoch) {
+ private void maybeFireLeaderChange() {
for (ListenerContext listenerContext : listenerContexts) {
- listenerContext.fireHandleResign(epoch);
+ listenerContext.maybeFireLeaderChange(quorum.leaderAndEpoch());
}
}
@@ -409,6 +407,11 @@ public LeaderAndEpoch leaderAndEpoch() {
return quorum.leaderAndEpoch();
}
+ @Override
+ public OptionalInt nodeId() {
+ return quorum.localId();
+ }
+
private OffsetAndEpoch endOffset() {
return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch());
}
@@ -432,6 +435,7 @@ private void onBecomeLeader(long currentTimeMs) throws IOException {
);
LeaderState state = quorum.transitionToLeader(endOffset, accumulator);
+ maybeFireLeaderChange(state);
log.initializeLeaderEpoch(quorum.epoch());
@@ -467,33 +471,28 @@ private void onBecomeCandidate(long currentTimeMs) throws IOException {
}
}
- private void maybeResignLeadership() {
- if (quorum.isLeader()) {
- fireHandleResign(quorum.epoch());
- }
- }
-
private void transitionToCandidate(long currentTimeMs) throws IOException {
- maybeResignLeadership();
quorum.transitionToCandidate();
+ maybeFireLeaderChange();
onBecomeCandidate(currentTimeMs);
}
private void transitionToUnattached(int epoch) throws IOException {
- maybeResignLeadership();
quorum.transitionToUnattached(epoch);
+ maybeFireLeaderChange();
resetConnections();
}
private void transitionToResigned(List preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(Errors.BROKER_NOT_AVAILABLE.exception("The broker is shutting down"));
quorum.transitionToResigned(preferredSuccessors);
+ maybeFireLeaderChange();
resetConnections();
}
private void transitionToVoted(int candidateId, int epoch) throws IOException {
- maybeResignLeadership();
quorum.transitionToVoted(epoch, candidateId);
+ maybeFireLeaderChange();
resetConnections();
}
@@ -517,8 +516,8 @@ private void transitionToFollower(
int leaderId,
long currentTimeMs
) throws IOException {
- maybeResignLeadership();
quorum.transitionToFollower(epoch, leaderId);
+ maybeFireLeaderChange();
onBecomeFollower(currentTimeMs);
}
@@ -1922,7 +1921,7 @@ private long pollResigned(long currentTimeMs) throws IOException {
private long pollLeader(long currentTimeMs) {
LeaderState state = quorum.leaderStateOrThrow();
- maybeFireHandleClaim(state);
+ maybeFireLeaderChange(state);
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
@@ -2260,6 +2259,11 @@ public CompletableFuture shutdown(int timeoutMs) {
return shutdownComplete;
}
+ @Override
+ public void resign(int epoch) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) {
return new SnapshotWriter<>(
@@ -2328,7 +2332,7 @@ public void complete() {
private final class ListenerContext implements CloseListener> {
private final RaftClient.Listener listener;
// This field is used only by the Raft IO thread
- private int claimedEpoch = 0;
+ private LeaderAndEpoch lastFiredLeaderChange = new LeaderAndEpoch(OptionalInt.empty(), 0);
// These fields are visible to both the Raft IO thread and the listener
// and are protected through synchronization on this `ListenerContext` instance
@@ -2420,19 +2424,33 @@ private void fireHandleCommit(BatchReader reader) {
listener.handleCommit(reader);
}
- void maybeFireHandleClaim(int epoch, long epochStartOffset) {
- // We can fire `handleClaim` as soon as the listener has caught
- // up to the start of the leader epoch. This guarantees that the
- // state machine has seen the full committed state before it becomes
- // leader and begins writing to the log.
- if (epoch > claimedEpoch && nextOffset() >= epochStartOffset) {
- claimedEpoch = epoch;
- listener.handleClaim(epoch);
+ void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+ if (shouldFireLeaderChange(leaderAndEpoch)) {
+ lastFiredLeaderChange = leaderAndEpoch;
+ listener.handleLeaderChange(leaderAndEpoch);
+ }
+ }
+
+ private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+ if (leaderAndEpoch.equals(lastFiredLeaderChange)) {
+ return false;
+ } else if (leaderAndEpoch.epoch() > lastFiredLeaderChange.epoch()) {
+ return true;
+ } else {
+ return leaderAndEpoch.leaderId().isPresent() &&
+ !lastFiredLeaderChange.leaderId().isPresent();
}
}
- void fireHandleResign(int epoch) {
- listener.handleResign(epoch);
+ void maybeFireLeaderChange(LeaderAndEpoch leaderAndEpoch, long epochStartOffset) {
+ // If this node is becoming the leader, then we can fire `handleClaim` as soon
+ // as the listener has caught up to the start of the leader epoch. This guarantees
+ // that the state machine has seen the full committed state before it becomes
+ // leader and begins writing to the log.
+ if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= epochStartOffset) {
+ lastFiredLeaderChange = leaderAndEpoch;
+ listener.handleLeaderChange(leaderAndEpoch);
+ }
}
public synchronized void onClose(BatchReader reader) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
index 47bd404ae2798..fee0c2feaf5c7 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
@@ -20,14 +20,26 @@
import java.util.OptionalInt;
public class LeaderAndEpoch {
- public final OptionalInt leaderId;
- public final int epoch;
+ private final OptionalInt leaderId;
+ private final int epoch;
public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
this.leaderId = Objects.requireNonNull(leaderId);
this.epoch = epoch;
}
+ public OptionalInt leaderId() {
+ return leaderId;
+ }
+
+ public int epoch() {
+ return epoch;
+ }
+
+ public boolean isLeader(int nodeId) {
+ return leaderId.isPresent() && leaderId.getAsInt() == nodeId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -41,4 +53,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(leaderId, epoch);
}
+
+ @Override
+ public String toString() {
+ return "LeaderAndEpoch(" +
+ "leaderId=" + leaderId +
+ ", epoch=" + epoch +
+ ')';
+ }
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 7aaa708be05f3..26c0446af6d09 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -223,6 +223,10 @@ public int localIdOrThrow() {
return localId.orElseThrow(() -> new IllegalStateException("Required local id is not present"));
}
+ public OptionalInt localId() {
+ return localId;
+ }
+
public int epoch() {
return state.epoch();
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 7293f796eb772..2f880f2d1d69d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -20,6 +20,7 @@
import org.apache.kafka.snapshot.SnapshotWriter;
import java.util.List;
+import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
public interface RaftClient extends AutoCloseable {
@@ -56,24 +57,29 @@ interface Listener {
void handleSnapshot(SnapshotReader reader);
/**
- * Invoked after this node has become a leader. This is only called after
- * all commits up to the start of the leader's epoch have been sent to
- * {@link #handleCommit(BatchReader)}.
+ * Called on any change to leadership. This includes both when a leader is elected and
+ * when a leader steps down or fails.
*
- * After becoming a leader, the client is eligible to write to the log
- * using {@link #scheduleAppend(int, List)} or {@link #scheduleAtomicAppend(int, List)}.
+ * If this node is the leader, then the notification of leadership will be delayed until
+ * the implementation of this interface has caughup to the high-watermark through calls to
+ * {@link #handleSnapshot(SnapshotReader)} and {@link #handleCommit(BatchReader)}.
*
- * @param epoch the claimed leader epoch
- */
- default void handleClaim(int epoch) {}
-
- /**
- * Invoked after a leader has stepped down. This callback may or may not
- * fire before the next leader has been elected.
+ * If this node is not the leader, then this method will be called as soon as possible. In
+ * this case the leader may or may not be known for the current epoch.
+ *
+ * Subsequent calls to this method will expose a monotonically increasing epoch. For a
+ * given epoch the leader may be unknown, {@code leader.leaderId} is {@code OptionalInt#empty},
+ * or known {@code leader.leaderId} is {@code OptionalInt#of}. Once a leader is known for
+ * a given epoch it will remain the leader for that epoch. In other words, the implementation of
+ * method should expect this method will be called at most twice for each epoch. Once if the
+ * epoch changed but the leader is not known and once when the leader is known for the current
+ * epoch.
*
- * @param epoch the epoch that the leader is resigning from
+ * @param leader the current leader and epoch
*/
- default void handleResign(int epoch) {}
+ default void handleLeaderChange(LeaderAndEpoch leader) {}
+
+ default void beginShutdown() {}
}
/**
@@ -94,6 +100,14 @@ default void handleResign(int epoch) {}
*/
LeaderAndEpoch leaderAndEpoch();
+ /**
+ * Get local nodeId if one is defined. This may be absent when the client is used
+ * as an anonymous observer, as in the case of the metadata shell.
+ *
+ * @return optional node id
+ */
+ OptionalInt nodeId();
+
/**
* Append a list of records to the log. The write will be scheduled for some time
* in the future. There is no guarantee that appended records will be written to
@@ -155,6 +169,16 @@ default void handleResign(int epoch) {}
*/
CompletableFuture shutdown(int timeoutMs);
+ /**
+ * Resign the leadership. The leader will give up its leadership in the current epoch,
+ * and a new election will be held. Note that nothing prevents this leader from getting
+ * reelected.
+ *
+ * @param epoch the epoch to resign from. If this does not match the current epoch, this
+ * call will be ignored.
+ */
+ void resign(int epoch);
+
/**
* Create a writable snapshot file for a given offset and epoch.
*
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 634ce37392f17..903a15c8f7339 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -140,17 +140,16 @@ public synchronized void handleSnapshot(SnapshotReader reader) {
}
@Override
- public synchronized void handleClaim(int epoch) {
- log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}",
- committed, epoch);
- uncommitted = committed;
- claimedEpoch = OptionalInt.of(epoch);
- }
-
- @Override
- public synchronized void handleResign(int epoch) {
- log.debug("Counter uncommitted value reset after resigning leadership");
- this.uncommitted = -1;
- this.claimedEpoch = OptionalInt.empty();
+ public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
+ if (newLeader.isLeader(nodeId)) {
+ log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}",
+ committed, newLeader);
+ uncommitted = committed;
+ claimedEpoch = OptionalInt.of(newLeader.epoch());
+ } else {
+ log.debug("Counter uncommitted value reset after resigning leadership");
+ uncommitted = -1;
+ claimedEpoch = OptionalInt.empty();
+ }
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
deleted file mode 100644
index dba84123466a8..0000000000000
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.raft.metadata;
-
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
-import org.apache.kafka.metalog.MetaLogManager;
-import org.apache.kafka.raft.Batch;
-import org.apache.kafka.raft.BatchReader;
-import org.apache.kafka.raft.LeaderAndEpoch;
-import org.apache.kafka.raft.RaftClient;
-import org.apache.kafka.snapshot.SnapshotReader;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
- * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
- * directly.
- */
-public class MetaLogRaftShim implements MetaLogManager {
- private final RaftClient client;
- private final int nodeId;
-
- public MetaLogRaftShim(RaftClient client, int nodeId) {
- this.client = client;
- this.nodeId = nodeId;
- }
-
- @Override
- public void initialize() {
- // NO-OP - The RaftClient is initialized externally
- }
-
- @Override
- public void register(MetaLogListener listener) {
- client.register(new ListenerShim(listener));
- }
-
- @Override
- public long scheduleAtomicWrite(long epoch, List batch) {
- return write(epoch, batch, true);
- }
-
- @Override
- public long scheduleWrite(long epoch, List batch) {
- return write(epoch, batch, false);
- }
-
- private long write(long epoch, List batch, boolean isAtomic) {
- final Long result;
- if (isAtomic) {
- result = client.scheduleAtomicAppend((int) epoch, batch);
- } else {
- result = client.scheduleAppend((int) epoch, batch);
- }
-
- if (result == null) {
- throw new IllegalArgumentException(
- String.format(
- "Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)",
- epoch,
- batch
- )
- );
- } else {
- return result;
- }
- }
-
- @Override
- public void renounce(long epoch) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public MetaLogLeader leader() {
- LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch();
- return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch);
- }
-
- @Override
- public int nodeId() {
- return nodeId;
- }
-
- private class ListenerShim implements RaftClient.Listener {
- private final MetaLogListener listener;
-
- private ListenerShim(MetaLogListener listener) {
- this.listener = listener;
- }
-
- @Override
- public void handleCommit(BatchReader reader) {
- try {
- // TODO: The `BatchReader` might need to read from disk if this is
- // not a leader. We want to move this IO to the state machine so that
- // it does not block Raft replication
- while (reader.hasNext()) {
- Batch batch = reader.next();
- List records = batch.records().stream()
- .map(ApiMessageAndVersion::message)
- .collect(Collectors.toList());
- listener.handleCommits(batch.lastOffset(), records);
- }
- } finally {
- reader.close();
- }
- }
-
- @Override
- public void handleSnapshot(SnapshotReader reader) {
- reader.close();
- }
-
- @Override
- public void handleClaim(int epoch) {
- listener.handleNewLeader(new MetaLogLeader(nodeId, epoch));
- }
-
- @Override
- public void handleResign(int epoch) {
- listener.handleRenounce(epoch);
- }
-
- @Override
- public String toString() {
- return "ListenerShim(" +
- "listener=" + listener +
- ')';
- }
- }
-
-}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 2acf287961a61..6e8ff62553654 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -145,7 +145,7 @@ public void testSecondListenerNotified() throws Exception {
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch);
- RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 259957a071125..b000764f08a3f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2336,7 +2336,7 @@ public void testLateRegisteredListenerCatchesUp() throws Exception {
assertEquals(9L, context.listener.claimedEpochStartOffset(epoch));
// Register a second listener and allow it to catch up to the high watermark
- RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.pollUntil(() -> OptionalInt.of(epoch).equals(secondListener.currentClaimedEpoch()));
assertEquals(OptionalLong.of(8L), secondListener.lastCommitOffset());
@@ -2427,7 +2427,7 @@ public void testHandleCommitCallbackFiresInVotedState() throws Exception {
assertEquals(OptionalLong.of(10L), context.client.highWatermark());
// Register another listener and verify that it catches up while we remain 'voted'
- RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, otherNodeId);
@@ -2477,7 +2477,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception {
context.assertVotedCandidate(candidateEpoch, localId);
// Register another listener and verify that it catches up
- RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
context.client.poll();
context.assertVotedCandidate(candidateEpoch, localId);
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 5feb9435f43d5..ce89223759546 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -27,12 +27,11 @@
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.record.UnalignedMemoryRecords;
-import org.apache.kafka.common.record.UnalignedRecords;
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.MockRawSnapshotReader;
+import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -407,7 +406,9 @@ public void initializeLeaderEpoch(int epoch) {
@Override
public RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) {
- return new MockRawSnapshotWriter(snapshotId);
+ return new MockRawSnapshotWriter(snapshotId, buffer -> {
+ snapshots.putIfAbsent(snapshotId, new MockRawSnapshotReader(snapshotId, buffer));
+ });
}
@Override
@@ -615,99 +616,4 @@ private EpochStartOffset(int epoch, long startOffset) {
this.startOffset = startOffset;
}
}
-
- final class MockRawSnapshotWriter implements RawSnapshotWriter {
- private final OffsetAndEpoch snapshotId;
- private ByteBufferOutputStream data;
- private boolean frozen;
-
- public MockRawSnapshotWriter(OffsetAndEpoch snapshotId) {
- this.snapshotId = snapshotId;
- this.data = new ByteBufferOutputStream(0);
- this.frozen = false;
- }
-
- @Override
- public OffsetAndEpoch snapshotId() {
- return snapshotId;
- }
-
- @Override
- public long sizeInBytes() {
- if (frozen) {
- throw new RuntimeException("Snapshot is already frozen " + snapshotId);
- }
- return data.position();
- }
-
- @Override
- public void append(UnalignedMemoryRecords records) {
- if (frozen) {
- throw new RuntimeException("Snapshot is already frozen " + snapshotId);
- }
- data.write(records.buffer());
- }
-
- @Override
- public void append(MemoryRecords records) {
- if (frozen) {
- throw new RuntimeException("Snapshot is already frozen " + snapshotId);
- }
- data.write(records.buffer());
- }
-
- @Override
- public boolean isFrozen() {
- return frozen;
- }
-
- @Override
- public void freeze() {
- if (frozen) {
- throw new RuntimeException("Snapshot is already frozen " + snapshotId);
- }
-
- frozen = true;
- ByteBuffer buffer = data.buffer();
- buffer.flip();
-
- snapshots.putIfAbsent(snapshotId, new MockRawSnapshotReader(snapshotId, buffer));
- }
-
- @Override
- public void close() {}
- }
-
- final static class MockRawSnapshotReader implements RawSnapshotReader {
- private final OffsetAndEpoch snapshotId;
- private final MemoryRecords data;
-
- MockRawSnapshotReader(OffsetAndEpoch snapshotId, ByteBuffer data) {
- this.snapshotId = snapshotId;
- this.data = MemoryRecords.readableRecords(data);
- }
-
- @Override
- public OffsetAndEpoch snapshotId() {
- return snapshotId;
- }
-
- @Override
- public long sizeInBytes() {
- return data.sizeInBytes();
- }
-
- @Override
- public UnalignedRecords slice(long position, int size) {
- ByteBuffer buffer = data.buffer();
- buffer.position(Math.toIntExact(position));
- buffer.limit(Math.min(buffer.limit(), Math.toIntExact(position + size)));
- return new UnalignedMemoryRecords(buffer.slice());
- }
-
- @Override
- public Records records() {
- return data;
- }
- }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index c4bf9199e5b16..16a4d5acb4777 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -223,7 +223,7 @@ public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel(voters);
LogContext logContext = new LogContext();
- MockListener listener = new MockListener();
+ MockListener listener = new MockListener(localId);
Map voterAddressMap = voters.stream()
.collect(Collectors.toMap(id -> id, RaftClientTestContext::mockAddress));
RaftConfig raftConfig = new RaftConfig(voterAddressMap, requestTimeoutMs, RETRY_BACKOFF_MS, electionTimeoutMs,
@@ -365,11 +365,11 @@ void becomeLeader() throws Exception {
}
OptionalInt currentLeader() {
- return currentLeaderAndEpoch().leaderId;
+ return currentLeaderAndEpoch().leaderId();
}
int currentEpoch() {
- return currentLeaderAndEpoch().epoch;
+ return currentLeaderAndEpoch().epoch();
}
LeaderAndEpoch currentLeaderAndEpoch() {
@@ -1057,9 +1057,14 @@ static class MockListener implements RaftClient.Listener {
private final List> savedBatches = new ArrayList<>();
private final Map claimedEpochStartOffsets = new HashMap<>();
private OptionalInt currentClaimedEpoch = OptionalInt.empty();
+ private final OptionalInt localId;
private Optional> snapshot = Optional.empty();
private boolean readCommit = true;
+ MockListener(OptionalInt localId) {
+ this.localId = localId;
+ }
+
int numCommittedBatches() {
return commits.size();
}
@@ -1141,19 +1146,18 @@ void readBatch(BatchReader reader) {
}
@Override
- public void handleClaim(int epoch) {
+ public void handleLeaderChange(LeaderAndEpoch leader) {
// We record the next expected offset as the claimed epoch's start
// offset. This is useful to verify that the `handleClaim` callback
// was not received early.
- long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
- lastCommitOffset().getAsLong() + 1 : 0L;
- this.currentClaimedEpoch = OptionalInt.of(epoch);
- this.claimedEpochStartOffsets.put(epoch, claimedEpochStartOffset);
- }
-
- @Override
- public void handleResign(int epoch) {
- this.currentClaimedEpoch = OptionalInt.empty();
+ if (localId.isPresent() && leader.isLeader(localId.getAsInt())) {
+ long claimedEpochStartOffset = lastCommitOffset().isPresent() ?
+ lastCommitOffset().getAsLong() + 1 : 0L;
+ this.currentClaimedEpoch = OptionalInt.of(leader.epoch());
+ this.claimedEpochStartOffsets.put(leader.epoch(), claimedEpochStartOffset);
+ } else {
+ this.currentClaimedEpoch = OptionalInt.empty();
+ }
}
@Override
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotReader.java b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotReader.java
new file mode 100644
index 0000000000000..8268f3e957523
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotReader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.UnalignedMemoryRecords;
+import org.apache.kafka.common.record.UnalignedRecords;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class MockRawSnapshotReader implements RawSnapshotReader {
+ private final OffsetAndEpoch snapshotId;
+ private final MemoryRecords data;
+
+ public MockRawSnapshotReader(OffsetAndEpoch snapshotId, ByteBuffer data) {
+ this.snapshotId = snapshotId;
+ this.data = MemoryRecords.readableRecords(data);
+ }
+
+ @Override
+ public OffsetAndEpoch snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return data.sizeInBytes();
+ }
+
+ @Override
+ public UnalignedRecords slice(long position, int size) {
+ ByteBuffer buffer = data.buffer();
+ buffer.position(Math.toIntExact(position));
+ buffer.limit(Math.min(buffer.limit(), Math.toIntExact(position + size)));
+ return new UnalignedMemoryRecords(buffer.slice());
+ }
+
+ @Override
+ public Records records() {
+ return data;
+ }
+}
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
new file mode 100644
index 0000000000000..316ac0aae2585
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.UnalignedMemoryRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class MockRawSnapshotWriter implements RawSnapshotWriter {
+ private final ByteBufferOutputStream data = new ByteBufferOutputStream(0);
+ private final OffsetAndEpoch snapshotId;
+ private final Consumer frozenHandler;
+
+ private boolean frozen = false;
+
+ public MockRawSnapshotWriter(
+ OffsetAndEpoch snapshotId,
+ Consumer frozenHandler
+ ) {
+ this.snapshotId = snapshotId;
+ this.frozenHandler = frozenHandler;
+ }
+
+ @Override
+ public OffsetAndEpoch snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ ensureNotFrozen();
+ return data.position();
+ }
+
+ @Override
+ public void append(UnalignedMemoryRecords records) {
+ ensureNotFrozen();
+ data.write(records.buffer());
+ }
+
+ @Override
+ public void append(MemoryRecords records) {
+ ensureNotFrozen();
+ data.write(records.buffer());
+ }
+
+ @Override
+ public boolean isFrozen() {
+ return frozen;
+ }
+
+ @Override
+ public void freeze() {
+ ensureNotFrozen();
+
+ frozen = true;
+ ByteBuffer buffer = data.buffer();
+ buffer.flip();
+
+ frozenHandler.accept(buffer);
+ }
+
+ @Override
+ public void close() {}
+
+ private void ensureNotFrozen() {
+ if (frozen) {
+ throw new IllegalStateException("Snapshot is already frozen " + snapshotId);
+ }
+ }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 55986f3c28d88..4a1f71dff8079 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -36,21 +36,19 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -79,13 +77,15 @@ public void setWorkingDirectory(String workingDirectory) {
}
}
- class LogListener implements MetaLogListener, RaftClient.Listener {
+ class LogListener implements RaftClient.Listener {
@Override
public void handleCommit(BatchReader reader) {
try {
- // TODO: handle lastOffset
while (reader.hasNext()) {
Batch batch = reader.next();
+ log.debug("handleCommits " + batch.records() + " at offset " + batch.lastOffset());
+ DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+ dir.create("offset").setContents(String.valueOf(batch.lastOffset()));
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
handleMessage(messageAndVersion.message());
}
@@ -95,18 +95,6 @@ public void handleCommit(BatchReader reader) {
}
}
- @Override
- public void handleCommits(long lastOffset, List messages) {
- appendEvent("handleCommits", () -> {
- log.debug("handleCommits " + messages + " at offset " + lastOffset);
- DirectoryNode dir = data.root.mkdirs("metadataQuorum");
- dir.create("offset").setContents(String.valueOf(lastOffset));
- for (ApiMessage message : messages) {
- handleMessage(message);
- }
- }, null);
- }
-
@Override
public void handleSnapshot(SnapshotReader reader) {
try {
@@ -122,7 +110,7 @@ public void handleSnapshot(SnapshotReader reader) {
}
@Override
- public void handleNewLeader(MetaLogLeader leader) {
+ public void handleLeaderChange(LeaderAndEpoch leader) {
appendEvent("handleNewLeader", () -> {
log.debug("handleNewLeader " + leader);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
@@ -130,21 +118,9 @@ public void handleNewLeader(MetaLogLeader leader) {
}, null);
}
- @Override
- public void handleClaim(int epoch) {
- // This shouldn't happen because we should never be the leader.
- log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
- }
-
- @Override
- public void handleRenounce(long epoch) {
- // This shouldn't happen because we should never be the leader.
- log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
- }
-
@Override
public void beginShutdown() {
- log.debug("MetaLogListener sent beginShutdown");
+ log.debug("Metadata log listener sent beginShutdown");
}
}
diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
index 3078505980cc4..9a51f2e8283d1 100644
--- a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
+++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
@@ -18,7 +18,6 @@
package org.apache.kafka.shell;
import org.apache.kafka.common.message.LeaderChangeMessage;
-import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
@@ -26,19 +25,23 @@
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.metalog.MetaLogLeader;
-import org.apache.kafka.metalog.MetaLogListener;
-import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
@@ -49,14 +52,14 @@ public final class SnapshotFileReader implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(SnapshotFileReader.class);
private final String snapshotPath;
- private final MetaLogListener listener;
+ private final RaftClient.Listener listener;
private final KafkaEventQueue queue;
private final CompletableFuture caughtUpFuture;
private FileRecords fileRecords;
private Iterator batchIterator;
private final MetadataRecordSerde serde = new MetadataRecordSerde();
- public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
+ public SnapshotFileReader(String snapshotPath, RaftClient.Listener listener) {
this.snapshotPath = snapshotPath;
this.listener = listener;
this.queue = new KafkaEventQueue(Time.SYSTEM,
@@ -101,7 +104,7 @@ private void handleNextBatch() {
private void scheduleHandleNextBatch() {
queue.append(new EventQueue.Event() {
@Override
- public void run() throws Exception {
+ public void run() {
handleNextBatch();
}
@@ -123,8 +126,10 @@ private void handleControlBatch(FileChannelRecordBatch batch) {
case LEADER_CHANGE:
LeaderChangeMessage message = new LeaderChangeMessage();
message.read(new ByteBufferAccessor(record.value()), (short) 0);
- listener.handleNewLeader(new MetaLogLeader(message.leaderId(),
- batch.partitionLeaderEpoch()));
+ listener.handleLeaderChange(new LeaderAndEpoch(
+ OptionalInt.of(message.leaderId()),
+ batch.partitionLeaderEpoch()
+ ));
break;
default:
log.error("Ignoring control record with type {} at offset {}",
@@ -137,18 +142,28 @@ private void handleControlBatch(FileChannelRecordBatch batch) {
}
private void handleMetadataBatch(FileChannelRecordBatch batch) {
- List messages = new ArrayList<>();
- for (Iterator iter = batch.iterator(); iter.hasNext(); ) {
- Record record = iter.next();
+ List messages = new ArrayList<>();
+ for (Record record : batch) {
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
try {
ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
- messages.add(messageAndVersion.message());
+ messages.add(messageAndVersion);
} catch (Throwable e) {
log.error("unable to read metadata record at offset {}", record.offset(), e);
}
}
- listener.handleCommits(batch.lastOffset(), messages);
+ listener.handleCommit(
+ new MemoryBatchReader<>(
+ Collections.singletonList(
+ Batch.of(
+ batch.baseOffset(),
+ batch.partitionLeaderEpoch(),
+ messages
+ )
+ ),
+ reader -> { }
+ )
+ );
}
public void beginShutdown(String reason) {