Skip to content

Commit

Permalink
Move reader iteration to event handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Gustafson committed Apr 7, 2021
1 parent 49934ef commit 2340d7f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package kafka.server.metadata

import java.util
import java.util.concurrent.TimeUnit

import kafka.coordinator.group.GroupCoordinator
Expand Down Expand Up @@ -78,28 +77,32 @@ class BrokerMetadataListener(
* Handle new metadata records.
*/
override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = {
try {
while (reader.hasNext) {
val batch = reader.next()
eventQueue.append(new HandleCommitsEvent(batch.lastOffset, batch.records ))
}
} finally {
reader.close()
}

eventQueue.append(new HandleCommitsEvent(reader))
}

// Visible for testing. It's useful to execute events synchronously
private[metadata] def execCommits(lastOffset: Long, records: util.List[ApiMessageAndVersion]): Unit = {
new HandleCommitsEvent(lastOffset, records).run()
// Visible for testing. It's useful to execute events synchronously in order
// to make tests deterministic
private[metadata] def execCommits(batch: BatchReader.Batch[ApiMessageAndVersion]): Unit = {
new HandleCommitsEvent(BatchReader.singleton(batch)).run()
}

class HandleCommitsEvent(lastOffset: Long,
records: util.List[ApiMessageAndVersion])
extends EventQueue.FailureLoggingEvent(log) {
class HandleCommitsEvent(
reader: BatchReader[ApiMessageAndVersion]
) extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
try {
apply(reader.next())
} finally {
reader.close()
}
}

private def apply(batch: BatchReader.Batch[ApiMessageAndVersion]): Unit = {
val records = batch.records
val lastOffset = batch.lastOffset

if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: handling ${records.size()} record(s).")
debug(s"Metadata batch $lastOffset: handling ${records.size()} record(s).")
}
val imageBuilder =
MetadataImageBuilder(brokerId, log, metadataCache.currentImage())
Expand All @@ -114,35 +117,35 @@ class BrokerMetadataListener(
}
handleMessage(imageBuilder, record.message, lastOffset)
} catch {
case e: Exception => error(s"Unable to handle record ${index} in batch " +
s"ending at offset ${lastOffset}", e)
case e: Exception => error(s"Unable to handle record $index in batch " +
s"ending at offset $lastOffset", e)
}
index = index + 1
}
if (imageBuilder.hasChanges) {
val newImage = imageBuilder.build()
if (isTraceEnabled) {
trace(s"Metadata batch ${lastOffset}: creating new metadata image ${newImage}")
trace(s"Metadata batch $lastOffset: creating new metadata image ${newImage}")
} else if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: creating new metadata image")
debug(s"Metadata batch $lastOffset: creating new metadata image")
}
metadataCache.image(newImage)
} else if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: no new metadata image required.")
debug(s"Metadata batch $lastOffset: no new metadata image required.")
}
if (imageBuilder.hasPartitionChanges) {
if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: applying partition changes")
debug(s"Metadata batch $lastOffset: applying partition changes")
}
replicaManager.handleMetadataRecords(imageBuilder, lastOffset,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
} else if (isDebugEnabled) {
debug(s"Metadata batch ${lastOffset}: no partition changes found.")
debug(s"Metadata batch $lastOffset: no partition changes found.")
}
_highestMetadataOffset = lastOffset
val endNs = time.nanoseconds()
val deltaUs = TimeUnit.MICROSECONDS.convert(endNs - startNs, TimeUnit.NANOSECONDS)
debug(s"Metadata batch ${lastOffset}: advanced highest metadata offset in ${deltaUs} " +
debug(s"Metadata batch $lastOffset: advanced highest metadata offset in ${deltaUs} " +
"microseconds.")
batchProcessingTimeHist.update(deltaUs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.kafka.common.metadata.{ConfigRecord, PartitionRecord, RemoveTo
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.ApiMessageAndVersion
import org.apache.kafka.raft.BatchReader
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers._
Expand All @@ -39,6 +40,7 @@ import scala.jdk.CollectionConverters._

class BrokerMetadataListenerTest {

private val leaderEpoch = 5
private val brokerId = 1
private val time = new MockTime()
private val configRepository = new CachedConfigRepository
Expand Down Expand Up @@ -82,11 +84,10 @@ class BrokerMetadataListenerTest {
): Unit = {
val deleteRecord = new RemoveTopicRecord()
.setTopicId(topicId)
lastMetadataOffset += 1
listener.execCommits(lastOffset = lastMetadataOffset, List[ApiMessageAndVersion](
new ApiMessageAndVersion(deleteRecord, 0.toShort),
).asJava)

applyBatch(List[ApiMessageAndVersion](
new ApiMessageAndVersion(deleteRecord, 0.toShort),
))
assertFalse(metadataCache.contains(topic))
assertEquals(new Properties, configRepository.topicConfig(topic))

Expand All @@ -108,17 +109,29 @@ class BrokerMetadataListenerTest {
assertEquals(localPartitions, localRemoved.map(_.toTopicPartition).toSet)
}

private def applyBatch(
records: List[ApiMessageAndVersion]
): Unit = {
val baseOffset = lastMetadataOffset + 1
lastMetadataOffset += records.size
listener.execCommits(new BatchReader.Batch(
baseOffset,
leaderEpoch,
records.asJava
))
}

private def createAndAssert(
topicId: Uuid,
topic: String,
topicConfig: Map[String, String],
numPartitions: Int,
numBrokers: Int
): Set[TopicPartition] = {
val records = new java.util.ArrayList[ApiMessageAndVersion]
records.add(new ApiMessageAndVersion(new TopicRecord()
val records = mutable.ListBuffer.empty[ApiMessageAndVersion]
records += new ApiMessageAndVersion(new TopicRecord()
.setName(topic)
.setTopicId(topicId), 0))
.setTopicId(topicId), 0)

val localTopicPartitions = mutable.Set.empty[TopicPartition]
(0 until numPartitions).map { partitionId =>
Expand All @@ -133,26 +146,25 @@ class BrokerMetadataListenerTest {
localTopicPartitions.add(new TopicPartition(topic, partitionId))
}

records.add(new ApiMessageAndVersion(new PartitionRecord()
records += new ApiMessageAndVersion(new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(partitionId)
.setLeader(preferredLeaderId)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
.setReplicas(replicas)
.setIsr(replicas), 0))
.setIsr(replicas), 0)
}

topicConfig.forKeyValue { (key, value) =>
records.add(new ApiMessageAndVersion(new ConfigRecord()
records += new ApiMessageAndVersion(new ConfigRecord()
.setResourceName(topic)
.setResourceType(ConfigResource.Type.TOPIC.id())
.setName(key)
.setValue(value), 0))
.setValue(value), 0)
}

lastMetadataOffset += records.size()
listener.execCommits(lastOffset = lastMetadataOffset, records)
applyBatch(records.toList)
assertTrue(metadataCache.contains(topic))
assertEquals(Some(numPartitions), metadataCache.numPartitions(topic))
assertEquals(topicConfig, configRepository.topicConfig(topic).asScala)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,58 +616,56 @@ private <T> CompletableFuture<T> appendWriteEvent(String name,

class QuorumMetaLogListener implements RaftClient.Listener<ApiMessageAndVersion> {

private void handleCommittedBatch(BatchReader.Batch<ApiMessageAndVersion> batch) {
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();

appendControlEvent("handleCommits[" + offset + "]", () -> {
if (curClaimEpoch == -1) {
// If the controller is a standby, replay the records that were
// created by the active controller.
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
log.trace("Replaying commits from the active node up to " +
"offset {}: {}.", offset, messages.stream()
.map(ApiMessageAndVersion::toString)
.collect(Collectors.joining(", ")));
@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
appendControlEvent("handleCommits[baseOffset=" + reader.baseOffset() + "]", () -> {
try {
boolean isActiveController = curClaimEpoch != -1;
while (reader.hasNext()) {
BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();

if (isActiveController) {
// If the controller is active, the records were already replayed,
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset {}.", offset);

// Complete any events in the purgatory that were waiting for this offset.
purgatory.completeUpTo(offset);

// Delete all the in-memory snapshots that we no longer need.
// If we are writing a new snapshot, then we need to keep that around;
// otherwise, we should delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(
Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));

} else {
log.debug("Replaying commits from the active node up to " +
"offset {}.", offset);
// If the controller is a standby, replay the records that were
// created by the active controller.
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
log.trace("Replaying commits from the active node up to " +
"offset {}: {}.", offset, messages.stream()
.map(ApiMessageAndVersion::toString)
.collect(Collectors.joining(", ")));
} else {
log.debug("Replaying commits from the active node up to " +
"offset {}.", offset);
}
}
for (ApiMessageAndVersion messageAndVersion : messages) {
replay(messageAndVersion.message(), -1, offset);
}
}
lastCommittedOffset = offset;
}
for (ApiMessageAndVersion messageAndVersion : messages) {
replay(messageAndVersion.message(), -1, offset);
}
} else {
// If the controller is active, the records were already replayed,
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset {}.", offset);

// Complete any events in the purgatory that were waiting for this offset.
purgatory.completeUpTo(offset);

// Delete all the in-memory snapshots that we no longer need.
// If we are writing a new snapshot, then we need to keep that around;
// otherwise, we should delete up to the current committed offset.
snapshotRegistry.deleteSnapshotsUpTo(
Math.min(offset, snapshotGeneratorManager.snapshotEpoch()));
} finally {
reader.close();
}
lastCommittedOffset = offset;
});
}

@Override
public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
try {
while (reader.hasNext()) {
BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
handleCommittedBatch(batch);
}
} finally {
reader.close();
}
}

@Override
public void handleLeaderChange(LeaderAndEpoch newLeader) {
if (newLeader.isLeader(nodeId)) {
Expand Down

0 comments on commit 2340d7f

Please sign in to comment.