Skip to content

Commit

Permalink
KAFKA-13173; Ensure KRaft controller handles concurrent broker expira…
Browse files Browse the repository at this point in the history
…tions correctly (apache#11191)

Prior to this patch, the controller did not accumulate ISR/leader changes correctly when multiple broker's sessions expired at the same time. This patch fixes the problem by having the controller handle one expiration at a time.

Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
niket-goel authored and Ralph Debusmann committed Dec 22, 2021
1 parent 79d2b14 commit 28d429b
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.metadata.UsableBroker;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -420,22 +419,22 @@ long nextCheckTimeNs() {
}

/**
* Find the stale brokers which haven't heartbeated in a long time, and which need to
* be fenced.
* Check if the oldest broker to have hearbeated has already violated the
* sessionTimeoutNs timeout and needs to be fenced.
*
* @return A list of node IDs.
* @return An Optional broker node id.
*/
List<Integer> findStaleBrokers() {
List<Integer> nodes = new ArrayList<>();
Optional<Integer> findOneStaleBroker() {
BrokerHeartbeatStateIterator iterator = unfenced.iterator();
while (iterator.hasNext()) {
if (iterator.hasNext()) {
BrokerHeartbeatState broker = iterator.next();
if (hasValidSession(broker)) {
break;
// The unfenced list is sorted on last contact time from each
// broker. If the first broker is not stale, then none is.
if (!hasValidSession(broker)) {
return Optional.of(broker.id);
}
nodes.add(broker.id);
}
return nodes;
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kafka.controller;

import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
Expand Down Expand Up @@ -161,6 +163,14 @@ Map<Integer, BrokerRegistration> brokerRegistrations() {
return brokerRegistrations;
}

Set<Integer> fencedBrokerIds() {
return brokerRegistrations.values()
.stream()
.filter(BrokerRegistration::fenced)
.map(BrokerRegistration::id)
.collect(Collectors.toSet());
}

/**
* Process an incoming broker registration request.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,9 @@ private void rescheduleMaybeFenceStaleBrokers() {
return;
}
scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
ControllerResult<Void> result = replicationControl.maybeFenceStaleBrokers();
ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker();
// This following call ensures that if there are multiple brokers that
// are currently stale, then fencing for them is scheduled immediately
rescheduleMaybeFenceStaleBrokers();
return result;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,18 +925,25 @@ public ControllerResult<Void> unregisterBroker(int brokerId) {
return ControllerResult.of(records, null);
}

ControllerResult<Void> maybeFenceStaleBrokers() {
ControllerResult<Void> maybeFenceOneStaleBroker() {
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
for (int brokerId : staleBrokers) {
heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
// Even though multiple brokers can go stale at a time, we will process
// fencing one at a time so that the effect of fencing each broker is visible
// to the system prior to processing the next one
log.info("Fencing broker {} because its session has timed out.", brokerId);
handleBrokerFenced(brokerId, records);
heartbeatManager.fence(brokerId);
}
});
return ControllerResult.of(records, null);
}

// Visible for testing
Boolean isBrokerUnfenced(int brokerId) {
return clusterControl.unfenced(brokerId);
}

ControllerResult<List<CreatePartitionsTopicResult>>
createPartitions(List<CreatePartitionsTopic> topics) {
List<ApiMessageAndVersion> records = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kafka.controller;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -78,7 +77,7 @@ public void testHasValidSession() {
}

@Test
public void testFindStaleBrokers() {
public void testFindOneStaleBroker() {
BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
MockTime time = (MockTime) manager.time();
assertFalse(manager.hasValidSession(0));
Expand All @@ -93,22 +92,24 @@ public void testFindStaleBrokers() {
assertEquals(1, iter.next().id());
assertEquals(2, iter.next().id());
assertFalse(iter.hasNext());
assertEquals(Collections.emptyList(), manager.findStaleBrokers());
assertEquals(Optional.empty(), manager.findOneStaleBroker());

time.sleep(5);
assertEquals(Collections.singletonList(0), manager.findStaleBrokers());
assertEquals(Optional.of(0), manager.findOneStaleBroker());
manager.fence(0);
assertEquals(Collections.emptyList(), manager.findStaleBrokers());
assertEquals(Optional.empty(), manager.findOneStaleBroker());
iter = manager.unfenced().iterator();
assertEquals(1, iter.next().id());
assertEquals(2, iter.next().id());
assertFalse(iter.hasNext());

time.sleep(20);
assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers());
assertEquals(Optional.of(1), manager.findOneStaleBroker());
manager.fence(1);
assertEquals(Optional.of(2), manager.findOneStaleBroker());
manager.fence(2);
assertEquals(Collections.emptyList(), manager.findStaleBrokers());

assertEquals(Optional.empty(), manager.findOneStaleBroker());
iter = manager.unfenced().iterator();
assertFalse(iter.hasNext());
}
Expand All @@ -125,17 +126,20 @@ public void testNextCheckTimeNs() {
manager.touch(2, false, 0);
time.sleep(1);
manager.touch(3, false, 0);
assertEquals(Collections.emptyList(), manager.findStaleBrokers());
assertEquals(Optional.empty(), manager.findOneStaleBroker());
assertEquals(10_000_000, manager.nextCheckTimeNs());
time.sleep(7);
assertEquals(10_000_000, manager.nextCheckTimeNs());
assertEquals(Collections.singletonList(0), manager.findStaleBrokers());
assertEquals(Optional.of(0), manager.findOneStaleBroker());
manager.fence(0);
assertEquals(12_000_000, manager.nextCheckTimeNs());

time.sleep(3);
assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers());
assertEquals(Optional.of(1), manager.findOneStaleBroker());
manager.fence(1);
assertEquals(Optional.of(2), manager.findOneStaleBroker());
manager.fence(2);

assertEquals(14_000_000, manager.nextCheckTimeNs());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand Down Expand Up @@ -173,6 +174,89 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
}

@Test
public void testFenceMultipleBrokers() throws Throwable {
List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5);
short replicationFactor = 5;
long sessionTimeoutMillis = 1000;

try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutMillis));
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = new HashMap<>();

for (Integer brokerId : allBrokers) {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId("06B-K3N1TBCNYFgruEVP0Q").
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
}

// Brokers are only registered and should still be fenced
allBrokers.forEach(brokerId -> {
assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});

// Unfence all brokers and create a topic foo
sendBrokerheartbeat(active, allBrokers, brokerEpochs);
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
new CreatableTopicCollection(Collections.singleton(
new CreatableTopic().setName("foo").setNumPartitions(1).
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData = active.createTopics(createTopicsRequestData).get();
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();

// Fence some of the brokers
TestUtils.waitForCondition(() -> {
sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
for (Integer brokerId : brokersToFence) {
if (active.replicationControl().isBrokerUnfenced(brokerId)) {
return false;
}
}
return true;
}, sessionTimeoutMillis * 3,
"Fencing of brokers did not process within expected time"
);

// Send another heartbeat to the brokers we want to keep alive
sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);

// At this point only the brokers we want fenced should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});

// Verify the isr and leaders for the topic partition
int[] expectedIsr = {1};
int[] isrFoo = active.replicationControl().getPartition(topicIdFoo, 0).isr;

assertTrue(Arrays.equals(isrFoo, expectedIsr),
"The ISR for topic foo was " + Arrays.toString(isrFoo) +
". It is expected to be " + Arrays.toString(expectedIsr));

int fooLeader = active.replicationControl().getPartition(topicIdFoo, 0).leader;
assertEquals(expectedIsr[0], fooLeader);
}
}

@Test
public void testUnregisterBroker() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) {
Expand Down Expand Up @@ -761,4 +845,24 @@ private Map<Integer, Long> registerBrokers(QuorumController controller, int numB
return brokerEpochs;
}

private void sendBrokerheartbeat(
QuorumController controller,
List<Integer> brokers,
Map<Integer, Long> brokerEpochs
) throws Exception {
if (brokers.isEmpty()) {
return;
}
for (Integer brokerId : brokers) {
BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(
new BrokerHeartbeatRequestData()
.setWantFence(false)
.setBrokerEpoch(brokerEpochs.get(brokerId))
.setBrokerId(brokerId)
.setCurrentMetadataOffset(100000)
).get();
assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.kafka.controller;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.controller.QuorumController.Builder;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.test.TestUtils;
Expand All @@ -36,16 +41,29 @@ public class QuorumControllerTestEnv implements AutoCloseable {
private final List<QuorumController> controllers;
private final LocalLogManagerTestEnv logEnv;

public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
Consumer<QuorumController.Builder> builderConsumer)
throws Exception {
public QuorumControllerTestEnv(
LocalLogManagerTestEnv logEnv,
Consumer<QuorumController.Builder> builderConsumer
) throws Exception {
this(logEnv, builderConsumer, Optional.empty());
}

public QuorumControllerTestEnv(
LocalLogManagerTestEnv logEnv,
Consumer<Builder> builderConsumer,
Optional<Long> sessionTimeoutMillis
) throws Exception {
this.logEnv = logEnv;
int numControllers = logEnv.logManagers().size();
this.controllers = new ArrayList<>(numControllers);
try {
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i);
builder.setRaftClient(logEnv.logManagers().get(i));
if (sessionTimeoutMillis.isPresent()) {
builder.setSessionTimeoutNs(NANOSECONDS.convert(
sessionTimeoutMillis.get(), TimeUnit.MILLISECONDS));
}
builderConsumer.accept(builder);
this.controllers.add(builder.build());
}
Expand Down
Loading

0 comments on commit 28d429b

Please sign in to comment.