Skip to content

Commit

Permalink
KAFKA-12474: Handle failure to write new session keys gracefully (#10396
Browse files Browse the repository at this point in the history
)

If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end.

At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader).

Verified with new unit tests for both cases (failure to write, failure to read back after write).

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
  • Loading branch information
C0urante authored and rhauch committed Apr 1, 2021
1 parent ecd7750 commit d91d64f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,16 @@ public void tick() {
if (checkForKeyRotation(now)) {
log.debug("Distributing new session key");
keyExpiration = Long.MAX_VALUE;
configBackingStore.putSessionKey(new SessionKey(
keyGenerator.generateKey(),
now
));
try {
configBackingStore.putSessionKey(new SessionKey(
keyGenerator.generateKey(),
now
));
} catch (Exception e) {
log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying");
canReadConfigs = false;
return;
}
}

// Process any external requests
Expand Down Expand Up @@ -1173,7 +1179,11 @@ private boolean handleRebalanceCompleted() {
* @return true if successful, false if timed out
*/
private boolean readConfigToEnd(long timeoutMs) {
log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
if (configState.offset() < assignment.offset()) {
log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
} else {
log.info("Reading to end of config log; current config state offset: {}", configState.offset());
}
try {
configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
configState = configBackingStore.snapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
Expand Down Expand Up @@ -67,6 +69,7 @@
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

import javax.crypto.SecretKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -84,8 +87,10 @@
import static java.util.Collections.singletonList;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -2176,6 +2181,83 @@ public void testPutTaskConfigsValidRequiredSignature() {
PowerMock.verifyAll();
}

@Test
public void testFailedToWriteSessionKey() throws Exception {
// First tick -- after joining the group, we try to write a new
// session key to the config topic, and fail
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
expectRebalance(1, Collections.emptyList(), Collections.emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
configBackingStore.putSessionKey(anyObject(SessionKey.class));
EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));

// Second tick -- we read to the end of the config topic first,
// then ensure we're still active in the group
// then try a second time to write a new session key,
// then finally begin polling for group activity
expectPostRebalanceCatchup(SNAPSHOT);
member.ensureActive();
PowerMock.expectLastCall();
configBackingStore.putSessionKey(anyObject(SessionKey.class));
EasyMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();

PowerMock.replayAll();

herder.tick();
herder.tick();

PowerMock.verifyAll();
}

@Test
public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception {
SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
TASK_CONFIGS_MAP, Collections.emptySet());

// First tick -- after joining the group, we try to write a new session key to
// the config topic, and fail (in this case, we're trying to simulate that we've
// actually written the key successfully, but haven't been able to read it back
// from the config topic, so to the herder it looks the same as if it'd just failed
// to write the key)
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
expectRebalance(1, Collections.emptyList(), Collections.emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
configBackingStore.putSessionKey(anyObject(SessionKey.class));
EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));

// Second tick -- we read to the end of the config topic first, and pick up
// the session key that we were able to write the last time,
// then ensure we're still active in the group
// then finally begin polling for group activity
// Importantly, we do not try to write a new session key this time around
configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall().andAnswer(() -> {
configUpdateListener.onSessionKeyUpdate(sessionKey);
return null;
});
EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithSessionKey);
member.ensureActive();
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();

PowerMock.replayAll(secretKey);

herder.tick();
herder.tick();

PowerMock.verifyAll();
}

@Test
public void testKeyExceptionDetection() {
assertFalse(herder.isPossibleExpiredKeyException(
Expand Down

0 comments on commit d91d64f

Please sign in to comment.