From 106ab44b10eb637c2f46ff3755894c287f97711e Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 24 Mar 2021 14:19:22 -0400 Subject: [PATCH 1/2] KAFKA-12474: Handle failure to write new session keys gracefully --- .../distributed/DistributedHerder.java | 20 +++-- .../distributed/DistributedHerderTest.java | 82 +++++++++++++++++++ 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index b4dfb4de6e8b7..b7f67529160f5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -363,10 +363,16 @@ public void tick() { if (checkForKeyRotation(now)) { log.debug("Distributing new session key"); keyExpiration = Long.MAX_VALUE; - configBackingStore.putSessionKey(new SessionKey( - keyGenerator.generateKey(), - now - )); + try { + configBackingStore.putSessionKey(new SessionKey( + keyGenerator.generateKey(), + now + )); + } catch (Exception e) { + log.warn("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); + canReadConfigs = false; + return; + } } // Process any external requests @@ -1173,7 +1179,11 @@ private boolean handleRebalanceCompleted() { * @return true if successful, false if timed out */ private boolean readConfigToEnd(long timeoutMs) { - log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); + if (configState.offset() < assignment.offset()) { + log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); + } else { + log.info("Reading to end of config log; current config state offset: {}", configState.offset()); + } try { configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS); configState = configBackingStore.snapshot(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index e31a03f11b614..65ec89c8cf99b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -23,11 +23,13 @@ import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.MockConnectMetrics; +import org.apache.kafka.connect.runtime.SessionKey; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; @@ -67,6 +69,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import javax.crypto.SecretKey; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -84,8 +87,10 @@ import static java.util.Collections.singletonList; import static javax.ws.rs.core.Response.Status.FORBIDDEN; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.newCapture; import static org.junit.Assert.assertEquals; @@ -2176,6 +2181,83 @@ public void testPutTaskConfigsValidRequiredSignature() { PowerMock.verifyAll(); } + @Test + public void testFailedToWriteSessionKey() throws Exception { + // First tick -- after joining the group, we try to write a new + // session key to the config topic, and fail + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2); + expectRebalance(1, Collections.emptyList(), Collections.emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + configBackingStore.putSessionKey(anyObject(SessionKey.class)); + EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!")); + + // Second tick -- we read to the end of the config topic first, + // then ensure we're still active in the group + // then try a second time to write a new session key, + // then finally begin polling for group activity + expectPostRebalanceCatchup(SNAPSHOT); + member.ensureActive(); + PowerMock.expectLastCall(); + configBackingStore.putSessionKey(anyObject(SessionKey.class)); + EasyMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + herder.tick(); + + PowerMock.verifyAll(); + } + + @Test + public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception { + SecretKey secretKey = EasyMock.niceMock(SecretKey.class); + EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT); + EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]); + SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds()); + ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, Collections.emptySet()); + + // First tick -- after joining the group, we try to write a new session key to + // the config topic, and fail (in this case, we're trying to simulate that we've + // actually written the key successfully, but haven't been able to read it back + // from the config topic, so to the herder it looks the same as if it'd just failed + // to write the key) + EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2); + expectRebalance(1, Collections.emptyList(), Collections.emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + configBackingStore.putSessionKey(anyObject(SessionKey.class)); + EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!")); + + // Second tick -- we read to the end of the config topic first, and pick up + // the session key that we were able to write the last time, + // then ensure we're still active in the group + // then finally begin polling for group activity + // Importantly, we do not try to write a new session key this time around + configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + EasyMock.expectLastCall().andAnswer(() -> { + configUpdateListener.onSessionKeyUpdate(sessionKey); + return null; + }); + EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithSessionKey); + member.ensureActive(); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(secretKey); + + herder.tick(); + herder.tick(); + + PowerMock.verifyAll(); + } + @Test public void testKeyExceptionDetection() { assertFalse(herder.isPossibleExpiredKeyException( From bc167b2ccb90d14d4b0fb8c0ba7eee096d7bc14c Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Mon, 29 Mar 2021 22:57:31 -0400 Subject: [PATCH 2/2] KAFKA-12474: Address code review --- .../kafka/connect/runtime/distributed/DistributedHerder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index b7f67529160f5..b3b9ba9b94d2e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -369,7 +369,7 @@ public void tick() { now )); } catch (Exception e) { - log.warn("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); + log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying"); canReadConfigs = false; return; }