From 543f65cf38c79a61e6c5b8d6f079c7e86aa67d1c Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Wed, 18 Dec 2024 11:51:34 -0800 Subject: [PATCH] [server] Reset current version replica to ERROR on ingestion error (#1387) For hybrid stores whose current version is ingesting the real time data from a real time topic, if there is any exception/error thrown during ingestion they would log error message, sit still and stop ingestion, resulting in stale replicas. The mitigation mechanism is for oncall to manually restart every impacted cluster, which is very labor-intensive. This PR tries to resolve that by marking such replicas to ERROR state, which later in a controller task to reset such error replicas, will try to initiate new state transition of those replicas which can possibly mitigate such stale replica issue. --------- Co-authored-by: Sourav Maji --- .../StateModelIngestionProgressNotifier.java | 3 +- .../kafka/consumer/StoreIngestionTask.java | 39 +++++++++++++------ .../consumer/StoreIngestionTaskTest.java | 36 +++++++++++++++-- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java index b16ffe7bfe2..caa3e6e003f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java @@ -5,6 +5,7 @@ import com.linkedin.davinci.kafka.consumer.StoreIngestionService; import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.exceptions.VeniceTimeoutException; import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.Map; @@ -51,7 +52,7 @@ void waitConsumptionCompleted( // Report ingestion_failure String storeName = Version.parseStoreFromKafkaTopicName(resourceName); storeIngestionService.recordIngestionFailure(storeName); - VeniceException veniceException = new VeniceException(errorMsg); + VeniceTimeoutException veniceException = new VeniceTimeoutException(errorMsg); storeIngestionService.getStoreIngestionTask(resourceName).reportError(errorMsg, partitionId, veniceException); } stateModelToIngestionCompleteFlagMap.remove(stateModelId); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 65d7c95fce2..2483370b19f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1405,8 +1405,8 @@ private void processIngestionException() { /** * Special handling for current version when encountering {@link MemoryLimitExhaustedException}. */ - if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class) - && isCurrentVersion.getAsBoolean()) { + if (isCurrentVersion.getAsBoolean() + && ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) { LOGGER.warn( "Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and" + " resume the consumption after killing ingestion tasks for non current versions"); @@ -1439,10 +1439,23 @@ private void processIngestionException() { // DaVinci is always a follower. subscribePartition(pubSubTopicPartition, false); } + } else if (isCurrentVersion.getAsBoolean() && resetErrorReplicaEnabled && !isDaVinciClient) { + // marking its replica status ERROR which will later be reset by the controller + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition))); + LOGGER.error( + "Marking current version replica status to ERROR for replica: {}", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + // No need to reset again, clearing out the exception. + partitionIngestionExceptionList.set(exceptionPartition, null); } else { if (!partitionConsumptionState.isCompletionReported()) { reportError(partitionException.getMessage(), exceptionPartition, partitionException); - } else { LOGGER.error( "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", @@ -1790,6 +1803,16 @@ private void reportError( } else { ingestionNotificationDispatcher.reportError(pcsList, message, consumerEx); } + // Set the replica state to ERROR so that the controller can attempt to reset the partition. + if (isCurrentVersion.getAsBoolean() && !isDaVinciClient && resetErrorReplicaEnabled + && !(consumerEx instanceof VeniceTimeoutException)) { + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, partitionId))); + } } private void internalClose(boolean doFlush) { @@ -4222,16 +4245,8 @@ public void reportError(String message, int userPartition, Exception e) { if (partitionConsumptionStateMap.containsKey(userPartition)) { pcsList.add(partitionConsumptionStateMap.get(userPartition)); } + reportError(pcsList, userPartition, message, e); ingestionNotificationDispatcher.reportError(pcsList, message, e); - // Set the replica state to ERROR so that the controller can attempt to reset the partition. - if (!isDaVinciClient && resetErrorReplicaEnabled) { - zkHelixAdmin.get() - .setPartitionsToError( - serverConfig.getClusterName(), - hostName, - kafkaVersionTopic, - Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, userPartition))); - } } public boolean isActiveActiveReplicationEnabled() { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index ad24ec45c85..d894c8b33ba 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -27,6 +27,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX; +import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH; import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; @@ -46,6 +47,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; @@ -385,6 +387,8 @@ public static Object[][] sortedInputAndAAConfigProvider() { private Runnable runnableForKillNonCurrentVersion; + private ZKHelixAdmin zkHelixAdmin; + private static final int PARTITION_COUNT = 10; private static final Set ALL_PARTITIONS = new HashSet<>(); static { @@ -858,6 +862,8 @@ private void runTest( Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); + zkHelixAdmin = mock(ZKHelixAdmin.class); + doNothing().when(zkHelixAdmin).setPartitionsToError(anyString(), anyString(), anyString(), anyList()); storeIngestionTaskUnderTest = spy( ingestionTaskFactory.getNewIngestionTask( storageService, @@ -870,7 +876,7 @@ private void runTest( false, Optional.empty(), recordTransformerFunction, - Lazy.of(() -> mock(ZKHelixAdmin.class)))); + Lazy.of(() -> zkHelixAdmin))); Future testSubscribeTaskFuture = null; try { @@ -2830,6 +2836,7 @@ private VeniceServerConfig buildVeniceServerConfig(Map extraProp propertyBuilder.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 1000); propertyBuilder.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 1000); propertyBuilder.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); + propertyBuilder.put(SERVER_RESET_ERROR_REPLICA_ENABLED, true); extraProperties.forEach(propertyBuilder::put); Map> kafkaClusterMap = new HashMap<>(); @@ -4371,8 +4378,7 @@ public void testIngestionTaskForNonCurrentVersionShouldFailWhenEncounteringMemor } @Test - public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEncounteringMemoryLimitException() - throws Exception { + public void testIngestionTaskCurrentVersionKillOngoingPushOnMemoryLimitException() throws Exception { doThrow(new MemoryLimitExhaustedException("mock exception")).doNothing() .when(mockAbstractStorageEngine) .put(anyInt(), any(), (ByteBuffer) any()); @@ -4391,6 +4397,30 @@ public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEnco }, AA_OFF); } + @Test + public void testIngestionTaskForCurrentVersionResetException() throws Exception { + doThrow(new VeniceException("mock exception")).doNothing() + .when(mockAbstractStorageEngine) + .put(anyInt(), any(), (ByteBuffer) any()); + isCurrentVersion = () -> true; + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null).get(); + localVeniceWriter.broadcastEndOfPush(new HashMap<>()); + + StoreIngestionTaskTestConfig testConfig = + new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> { + // pcs.completionReported(); + verify(mockAbstractStorageEngine, timeout(10000).times(1)).put(eq(PARTITION_FOO), any(), (ByteBuffer) any()); + Utils.sleep(1000); + verify(zkHelixAdmin, atLeast(1)).setPartitionsToError(anyString(), anyString(), anyString(), anyList()); + }, AA_OFF); + testConfig.setStoreVersionConfigOverride(configOverride -> { + doReturn(true).when(configOverride).isResetErrorReplicaEnabled(); + }); + runTest(testConfig); + } + @Test public void testShouldProduceToVersionTopic() throws Exception { runTest(Collections.singleton(PARTITION_FOO), () -> {