From 694abe084dc0fa790af66b6645d976b6298f588f Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Tue, 10 Dec 2024 16:24:41 -0800 Subject: [PATCH 1/8] [server] Set current version replicas to ERROR state on hitting ingestion exception --- .../davinci/kafka/consumer/StoreIngestionTask.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 65d7c95fce..51ce027b84 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1442,7 +1442,17 @@ private void processIngestionException() { } else { if (!partitionConsumptionState.isCompletionReported()) { reportError(partitionException.getMessage(), exceptionPartition, partitionException); - + } else if (resetErrorReplicaEnabled && !isDaVinciClient) { + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition))); + LOGGER.error( + "Marking replica status to ERROR for replica: {}", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); } else { LOGGER.error( "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", From c49228f6d45409309277e3411ace717a211dab2e Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Wed, 11 Dec 2024 16:36:13 -0800 Subject: [PATCH 2/8] [server] Set current version replicas to ERROR state on hitting ingestion exception --- .../kafka/consumer/StoreIngestionTask.java | 87 ++++++++++--------- .../consumer/StoreIngestionTaskTest.java | 36 +++++++- 2 files changed, 79 insertions(+), 44 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 51ce027b84..c13bd5b49b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1405,54 +1405,59 @@ private void processIngestionException() { /** * Special handling for current version when encountering {@link MemoryLimitExhaustedException}. */ - if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class) - && isCurrentVersion.getAsBoolean()) { - LOGGER.warn( - "Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and" - + " resume the consumption after killing ingestion tasks for non current versions"); - /** - * Pause topic consumption to avoid more damage. - * We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task - * will end. Even later on, there are avaiable memory space, we can't resume the ingestion task. - */ - pauseConsumption(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber()); - LOGGER.info( - "Memory limit reached. Pausing consumption of topic-partition: {}", - Utils.getReplicaId( - pubSubTopicPartition.getPubSubTopic().getName(), - pubSubTopicPartition.getPartitionNumber())); - runnableForKillIngestionTasksForNonCurrentVersions.run(); - if (storageEngine.hasMemorySpaceLeft()) { - unSubscribePartition(pubSubTopicPartition, false); + if (isCurrentVersion.getAsBoolean()) { + if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) { + LOGGER.warn( + "Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and" + + " resume the consumption after killing ingestion tasks for non current versions"); /** - * DaVinci ingestion hits memory limit and we would like to retry it in the following way: - * 1. Kill the ingestion tasks for non-current versions. - * 2. Reopen the database since the current database in a bad state, where it can't write or sync even - * there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced - * memtable unfortunately. - * 3. Resubscribe the affected partition. + * Pause topic consumption to avoid more damage. + * We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task + * will end. Even later on, there are avaiable memory space, we can't resume the ingestion task. */ + pauseConsumption( + pubSubTopicPartition.getPubSubTopic().getName(), + pubSubTopicPartition.getPartitionNumber()); LOGGER.info( - "Ingestion for topic-partition: {} can resume since more space has been reclaimed.", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition)); - storageEngine.reopenStoragePartition(exceptionPartition); - // DaVinci is always a follower. - subscribePartition(pubSubTopicPartition, false); + "Memory limit reached. Pausing consumption of topic-partition: {}", + Utils.getReplicaId( + pubSubTopicPartition.getPubSubTopic().getName(), + pubSubTopicPartition.getPartitionNumber())); + runnableForKillIngestionTasksForNonCurrentVersions.run(); + if (storageEngine.hasMemorySpaceLeft()) { + unSubscribePartition(pubSubTopicPartition, false); + /** + * DaVinci ingestion hits memory limit and we would like to retry it in the following way: + * 1. Kill the ingestion tasks for non-current versions. + * 2. Reopen the database since the current database in a bad state, where it can't write or sync even + * there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced + * memtable unfortunately. + * 3. Resubscribe the affected partition. + */ + LOGGER.info( + "Ingestion for topic-partition: {} can resume since more space has been reclaimed.", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition)); + storageEngine.reopenStoragePartition(exceptionPartition); + // DaVinci is always a follower. + subscribePartition(pubSubTopicPartition, false); + } + } else { + if (resetErrorReplicaEnabled && !isDaVinciClient) { + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition))); + LOGGER.error( + "Marking current version replica status to ERROR for replica: {}", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + } } } else { if (!partitionConsumptionState.isCompletionReported()) { reportError(partitionException.getMessage(), exceptionPartition, partitionException); - } else if (resetErrorReplicaEnabled && !isDaVinciClient) { - zkHelixAdmin.get() - .setPartitionsToError( - serverConfig.getClusterName(), - hostName, - kafkaVersionTopic, - Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition))); - LOGGER.error( - "Marking replica status to ERROR for replica: {}", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), - partitionException); } else { LOGGER.error( "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 1d2e117d42..04019a8b6f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -27,6 +27,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX; +import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH; import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; @@ -46,6 +47,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; @@ -385,6 +387,8 @@ public static Object[][] sortedInputAndAAConfigProvider() { private Runnable runnableForKillNonCurrentVersion; + private ZKHelixAdmin zkHelixAdmin; + private static final int PARTITION_COUNT = 10; private static final Set ALL_PARTITIONS = new HashSet<>(); static { @@ -858,6 +862,8 @@ private void runTest( Properties kafkaProps = new Properties(); kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer()); + zkHelixAdmin = mock(ZKHelixAdmin.class); + doNothing().when(zkHelixAdmin).setPartitionsToError(anyString(), anyString(), anyString(), anyList()); storeIngestionTaskUnderTest = spy( ingestionTaskFactory.getNewIngestionTask( storageService, @@ -870,7 +876,7 @@ private void runTest( false, Optional.empty(), recordTransformerFunction, - Lazy.of(() -> mock(ZKHelixAdmin.class)))); + Lazy.of(() -> zkHelixAdmin))); Future testSubscribeTaskFuture = null; try { @@ -2830,6 +2836,7 @@ private VeniceServerConfig buildVeniceServerConfig(Map extraProp propertyBuilder.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 1000); propertyBuilder.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 1000); propertyBuilder.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true); + propertyBuilder.put(SERVER_RESET_ERROR_REPLICA_ENABLED, true); extraProperties.forEach(propertyBuilder::put); Map> kafkaClusterMap = new HashMap<>(); @@ -4371,8 +4378,7 @@ public void testIngestionTaskForNonCurrentVersionShouldFailWhenEncounteringMemor } @Test - public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEncounteringMemoryLimitException() - throws Exception { + public void testIngestionTaskCurrentVersionKillOngoingPushOnMemoryLimitException() throws Exception { doThrow(new MemoryLimitExhaustedException("mock exception")).doNothing() .when(mockAbstractStorageEngine) .put(anyInt(), any(), (ByteBuffer) any()); @@ -4391,6 +4397,30 @@ public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEnco }, AA_OFF); } + @Test + public void testIngestionTaskForCurrentVersionResetException() throws Exception { + doThrow(new VeniceException("mock exception")).doNothing() + .when(mockAbstractStorageEngine) + .put(anyInt(), any(), (ByteBuffer) any()); + isCurrentVersion = () -> true; + + localVeniceWriter.broadcastStartOfPush(new HashMap<>()); + localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null).get(); + localVeniceWriter.broadcastEndOfPush(new HashMap<>()); + + StoreIngestionTaskTestConfig testConfig = + new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> { + // pcs.completionReported(); + verify(mockAbstractStorageEngine, timeout(10000).times(1)).put(eq(PARTITION_FOO), any(), (ByteBuffer) any()); + Utils.sleep(1000); + verify(zkHelixAdmin, atLeast(1)).setPartitionsToError(anyString(), anyString(), anyString(), anyList()); + }, AA_OFF); + testConfig.setStoreVersionConfigOverride(configOverride -> { + doReturn(true).when(configOverride).isResetErrorReplicaEnabled(); + }); + runTest(testConfig); + } + @Test public void testShouldProduceToVersionTopic() throws Exception { runTest(Collections.singleton(PARTITION_FOO), () -> { From 0eb74e532c6f241b3863fc23fa071e9634a7c0a4 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Thu, 12 Dec 2024 11:41:54 -0800 Subject: [PATCH 3/8] fixed tests --- .../kafka/consumer/StoreIngestionTask.java | 114 +++++++++--------- 1 file changed, 55 insertions(+), 59 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index c13bd5b49b..e2b37f8125 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1405,70 +1405,66 @@ private void processIngestionException() { /** * Special handling for current version when encountering {@link MemoryLimitExhaustedException}. */ - if (isCurrentVersion.getAsBoolean()) { - if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) { - LOGGER.warn( - "Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and" - + " resume the consumption after killing ingestion tasks for non current versions"); + if (isCurrentVersion.getAsBoolean() + && ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) { + LOGGER.warn( + "Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and" + + " resume the consumption after killing ingestion tasks for non current versions"); + /** + * Pause topic consumption to avoid more damage. + * We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task + * will end. Even later on, there are avaiable memory space, we can't resume the ingestion task. + */ + pauseConsumption(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber()); + LOGGER.info( + "Memory limit reached. Pausing consumption of topic-partition: {}", + Utils.getReplicaId( + pubSubTopicPartition.getPubSubTopic().getName(), + pubSubTopicPartition.getPartitionNumber())); + runnableForKillIngestionTasksForNonCurrentVersions.run(); + if (storageEngine.hasMemorySpaceLeft()) { + unSubscribePartition(pubSubTopicPartition, false); /** - * Pause topic consumption to avoid more damage. - * We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task - * will end. Even later on, there are avaiable memory space, we can't resume the ingestion task. + * DaVinci ingestion hits memory limit and we would like to retry it in the following way: + * 1. Kill the ingestion tasks for non-current versions. + * 2. Reopen the database since the current database in a bad state, where it can't write or sync even + * there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced + * memtable unfortunately. + * 3. Resubscribe the affected partition. */ - pauseConsumption( - pubSubTopicPartition.getPubSubTopic().getName(), - pubSubTopicPartition.getPartitionNumber()); LOGGER.info( - "Memory limit reached. Pausing consumption of topic-partition: {}", - Utils.getReplicaId( - pubSubTopicPartition.getPubSubTopic().getName(), - pubSubTopicPartition.getPartitionNumber())); - runnableForKillIngestionTasksForNonCurrentVersions.run(); - if (storageEngine.hasMemorySpaceLeft()) { - unSubscribePartition(pubSubTopicPartition, false); - /** - * DaVinci ingestion hits memory limit and we would like to retry it in the following way: - * 1. Kill the ingestion tasks for non-current versions. - * 2. Reopen the database since the current database in a bad state, where it can't write or sync even - * there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced - * memtable unfortunately. - * 3. Resubscribe the affected partition. - */ - LOGGER.info( - "Ingestion for topic-partition: {} can resume since more space has been reclaimed.", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition)); - storageEngine.reopenStoragePartition(exceptionPartition); - // DaVinci is always a follower. - subscribePartition(pubSubTopicPartition, false); - } - } else { - if (resetErrorReplicaEnabled && !isDaVinciClient) { - zkHelixAdmin.get() - .setPartitionsToError( - serverConfig.getClusterName(), - hostName, - kafkaVersionTopic, - Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition))); - LOGGER.error( - "Marking current version replica status to ERROR for replica: {}", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), - partitionException); - } + "Ingestion for topic-partition: {} can resume since more space has been reclaimed.", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition)); + storageEngine.reopenStoragePartition(exceptionPartition); + // DaVinci is always a follower. + subscribePartition(pubSubTopicPartition, false); } + } else if (isCurrentVersion.getAsBoolean() && resetErrorReplicaEnabled && !isDaVinciClient) { + // marking its replica status ERROR which will later be reset by the controller + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition))); + LOGGER.error( + "Marking current version replica status to ERROR for replica: {}", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + // No need to reset again, clearing out the exception. + partitionIngestionExceptionList.set(exceptionPartition, null); + } else if (!partitionConsumptionState.isCompletionReported()) { + reportError(partitionException.getMessage(), exceptionPartition, partitionException); } else { - if (!partitionConsumptionState.isCompletionReported()) { - reportError(partitionException.getMessage(), exceptionPartition, partitionException); - } else { - LOGGER.error( - "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), - partitionException); - } - // Unsubscribe the partition to avoid more damages. - if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { - // This is not an unsubscribe action from Helix - unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); - } + LOGGER.error( + "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + } + // Unsubscribe the partition to avoid more damages. + if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { + // This is not an unsubscribe action from Helix + unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); } } }); From 3b90c0b564870bc0c237d05c9017d02f97d416ed Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Thu, 12 Dec 2024 16:01:24 -0800 Subject: [PATCH 4/8] fixed tests --- .../kafka/consumer/StoreIngestionTask.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index e2b37f8125..7fe71ec6eb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1453,18 +1453,20 @@ private void processIngestionException() { partitionException); // No need to reset again, clearing out the exception. partitionIngestionExceptionList.set(exceptionPartition, null); - } else if (!partitionConsumptionState.isCompletionReported()) { - reportError(partitionException.getMessage(), exceptionPartition, partitionException); } else { - LOGGER.error( - "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", - Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), - partitionException); - } - // Unsubscribe the partition to avoid more damages. - if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { - // This is not an unsubscribe action from Helix - unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); + if (!partitionConsumptionState.isCompletionReported()) { + reportError(partitionException.getMessage(), exceptionPartition, partitionException); + } else { + LOGGER.error( + "Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.", + Utils.getReplicaId(kafkaVersionTopic, exceptionPartition), + partitionException); + } + // Unsubscribe the partition to avoid more damages. + if (partitionConsumptionStateMap.containsKey(exceptionPartition)) { + // This is not an unsubscribe action from Helix + unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, exceptionPartition), false); + } } } }); From 02bbfaacd0f101f8d382cee83f25f19cf5d02a8f Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Tue, 17 Dec 2024 09:59:05 -0800 Subject: [PATCH 5/8] moved reset method --- .../kafka/consumer/StoreIngestionTask.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 7fe71ec6eb..2893e8acad 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1803,6 +1803,15 @@ private void reportError( } else { ingestionNotificationDispatcher.reportError(pcsList, message, consumerEx); } + // Set the replica state to ERROR so that the controller can attempt to reset the partition. + if (!isDaVinciClient && resetErrorReplicaEnabled) { + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, partitionId))); + } } private void internalClose(boolean doFlush) { @@ -4235,16 +4244,7 @@ public void reportError(String message, int userPartition, Exception e) { if (partitionConsumptionStateMap.containsKey(userPartition)) { pcsList.add(partitionConsumptionStateMap.get(userPartition)); } - ingestionNotificationDispatcher.reportError(pcsList, message, e); - // Set the replica state to ERROR so that the controller can attempt to reset the partition. - if (!isDaVinciClient && resetErrorReplicaEnabled) { - zkHelixAdmin.get() - .setPartitionsToError( - serverConfig.getClusterName(), - hostName, - kafkaVersionTopic, - Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, userPartition))); - } + reportError(pcsList, userPartition, message, e); } public boolean isActiveActiveReplicationEnabled() { From 40ca4501416dba91753693f612a8b0cba578598b Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Tue, 17 Dec 2024 15:56:48 -0800 Subject: [PATCH 6/8] addressed Xun's comments --- .../helix/StateModelIngestionProgressNotifier.java | 3 ++- .../davinci/kafka/consumer/StoreIngestionTask.java | 11 +++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java index b16ffe7bfe..caa3e6e003 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/StateModelIngestionProgressNotifier.java @@ -5,6 +5,7 @@ import com.linkedin.davinci.kafka.consumer.StoreIngestionService; import com.linkedin.davinci.notifier.VeniceNotifier; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.exceptions.VeniceTimeoutException; import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.util.Map; @@ -51,7 +52,7 @@ void waitConsumptionCompleted( // Report ingestion_failure String storeName = Version.parseStoreFromKafkaTopicName(resourceName); storeIngestionService.recordIngestionFailure(storeName); - VeniceException veniceException = new VeniceException(errorMsg); + VeniceTimeoutException veniceException = new VeniceTimeoutException(errorMsg); storeIngestionService.getStoreIngestionTask(resourceName).reportError(errorMsg, partitionId, veniceException); } stateModelToIngestionCompleteFlagMap.remove(stateModelId); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 2893e8acad..356a212f5d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -4245,6 +4245,17 @@ public void reportError(String message, int userPartition, Exception e) { pcsList.add(partitionConsumptionStateMap.get(userPartition)); } reportError(pcsList, userPartition, message, e); + ingestionNotificationDispatcher.reportError(pcsList, message, e); + // Set the replica state to ERROR so that the controller can attempt to reset the partition. + if (isCurrentVersion.getAsBoolean() && !isDaVinciClient && resetErrorReplicaEnabled + && !(e instanceof VeniceTimeoutException)) { + zkHelixAdmin.get() + .setPartitionsToError( + serverConfig.getClusterName(), + hostName, + kafkaVersionTopic, + Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, userPartition))); + } } public boolean isActiveActiveReplicationEnabled() { From 1500ea421ff0d52fbc9bef684f657d81aaefb6f0 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Tue, 17 Dec 2024 16:05:37 -0800 Subject: [PATCH 7/8] addressed Xun's comments --- .../davinci/kafka/consumer/StoreIngestionTask.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 356a212f5d..2d3055db3b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1804,7 +1804,8 @@ private void reportError( ingestionNotificationDispatcher.reportError(pcsList, message, consumerEx); } // Set the replica state to ERROR so that the controller can attempt to reset the partition. - if (!isDaVinciClient && resetErrorReplicaEnabled) { + if (isCurrentVersion.getAsBoolean() && !isDaVinciClient && resetErrorReplicaEnabled + && (consumerEx instanceof VeniceTimeoutException)) { zkHelixAdmin.get() .setPartitionsToError( serverConfig.getClusterName(), @@ -4246,16 +4247,6 @@ public void reportError(String message, int userPartition, Exception e) { } reportError(pcsList, userPartition, message, e); ingestionNotificationDispatcher.reportError(pcsList, message, e); - // Set the replica state to ERROR so that the controller can attempt to reset the partition. - if (isCurrentVersion.getAsBoolean() && !isDaVinciClient && resetErrorReplicaEnabled - && !(e instanceof VeniceTimeoutException)) { - zkHelixAdmin.get() - .setPartitionsToError( - serverConfig.getClusterName(), - hostName, - kafkaVersionTopic, - Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, userPartition))); - } } public boolean isActiveActiveReplicationEnabled() { From ef66c883ca5d84305f9fd702fc969ff5872d7b82 Mon Sep 17 00:00:00 2001 From: Sourav Maji Date: Tue, 17 Dec 2024 16:30:08 -0800 Subject: [PATCH 8/8] addressed Xun's comments --- .../com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 2d3055db3b..2483370b19 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1805,7 +1805,7 @@ private void reportError( } // Set the replica state to ERROR so that the controller can attempt to reset the partition. if (isCurrentVersion.getAsBoolean() && !isDaVinciClient && resetErrorReplicaEnabled - && (consumerEx instanceof VeniceTimeoutException)) { + && !(consumerEx instanceof VeniceTimeoutException)) { zkHelixAdmin.get() .setPartitionsToError( serverConfig.getClusterName(),