Skip to content

Commit

Permalink
[server] Set current version replicas to ERROR state on hitting inges…
Browse files Browse the repository at this point in the history
…tion exception
  • Loading branch information
Sourav Maji committed Dec 12, 2024
1 parent 7b58033 commit 82e037a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1405,54 +1405,59 @@ private void processIngestionException() {
/**
* Special handling for current version when encountering {@link MemoryLimitExhaustedException}.
*/
if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)
&& isCurrentVersion.getAsBoolean()) {
LOGGER.warn(
"Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and"
+ " resume the consumption after killing ingestion tasks for non current versions");
/**
* Pause topic consumption to avoid more damage.
* We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task
* will end. Even later on, there are avaiable memory space, we can't resume the ingestion task.
*/
pauseConsumption(pubSubTopicPartition.getPubSubTopic().getName(), pubSubTopicPartition.getPartitionNumber());
LOGGER.info(
"Memory limit reached. Pausing consumption of topic-partition: {}",
Utils.getReplicaId(
pubSubTopicPartition.getPubSubTopic().getName(),
pubSubTopicPartition.getPartitionNumber()));
runnableForKillIngestionTasksForNonCurrentVersions.run();
if (storageEngine.hasMemorySpaceLeft()) {
unSubscribePartition(pubSubTopicPartition, false);
if (isCurrentVersion.getAsBoolean()) {
if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) {
LOGGER.warn(
"Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and"
+ " resume the consumption after killing ingestion tasks for non current versions");
/**
* DaVinci ingestion hits memory limit and we would like to retry it in the following way:
* 1. Kill the ingestion tasks for non-current versions.
* 2. Reopen the database since the current database in a bad state, where it can't write or sync even
* there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced
* memtable unfortunately.
* 3. Resubscribe the affected partition.
* Pause topic consumption to avoid more damage.
* We can't unsubscribe it since in some scenario, all the partitions can be unsubscribed, and the ingestion task
* will end. Even later on, there are avaiable memory space, we can't resume the ingestion task.
*/
pauseConsumption(
pubSubTopicPartition.getPubSubTopic().getName(),
pubSubTopicPartition.getPartitionNumber());
LOGGER.info(
"Ingestion for topic-partition: {} can resume since more space has been reclaimed.",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition));
storageEngine.reopenStoragePartition(exceptionPartition);
// DaVinci is always a follower.
subscribePartition(pubSubTopicPartition, false);
"Memory limit reached. Pausing consumption of topic-partition: {}",
Utils.getReplicaId(
pubSubTopicPartition.getPubSubTopic().getName(),
pubSubTopicPartition.getPartitionNumber()));
runnableForKillIngestionTasksForNonCurrentVersions.run();
if (storageEngine.hasMemorySpaceLeft()) {
unSubscribePartition(pubSubTopicPartition, false);
/**
* DaVinci ingestion hits memory limit and we would like to retry it in the following way:
* 1. Kill the ingestion tasks for non-current versions.
* 2. Reopen the database since the current database in a bad state, where it can't write or sync even
* there are rooms (bug in SSTFileManager implementation in RocksDB). Reopen will drop the not-yet-synced
* memtable unfortunately.
* 3. Resubscribe the affected partition.
*/
LOGGER.info(
"Ingestion for topic-partition: {} can resume since more space has been reclaimed.",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition));
storageEngine.reopenStoragePartition(exceptionPartition);
// DaVinci is always a follower.
subscribePartition(pubSubTopicPartition, false);
}
} else {
if (resetErrorReplicaEnabled && !isDaVinciClient) {
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition)));
LOGGER.error(
"Marking current version replica status to ERROR for replica: {}",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
}
}
} else {
if (!partitionConsumptionState.isCompletionReported()) {
reportError(partitionException.getMessage(), exceptionPartition, partitionException);
} else if (resetErrorReplicaEnabled && !isDaVinciClient) {
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition)));
LOGGER.error(
"Marking replica status to ERROR for replica: {}",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
} else {
LOGGER.error(
"Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
Expand All @@ -46,6 +47,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -385,6 +387,8 @@ public static Object[][] sortedInputAndAAConfigProvider() {

private Runnable runnableForKillNonCurrentVersion;

private ZKHelixAdmin zkHelixAdmin;

private static final int PARTITION_COUNT = 10;
private static final Set<Integer> ALL_PARTITIONS = new HashSet<>();
static {
Expand Down Expand Up @@ -858,6 +862,8 @@ private void runTest(
Properties kafkaProps = new Properties();
kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer());

zkHelixAdmin = mock(ZKHelixAdmin.class);
doNothing().when(zkHelixAdmin).setPartitionsToError(anyString(), anyString(), anyString(), anyList());
storeIngestionTaskUnderTest = spy(
ingestionTaskFactory.getNewIngestionTask(
storageService,
Expand All @@ -870,7 +876,7 @@ private void runTest(
false,
Optional.empty(),
recordTransformerFunction,
Lazy.of(() -> mock(ZKHelixAdmin.class))));
Lazy.of(() -> zkHelixAdmin)));

Future testSubscribeTaskFuture = null;
try {
Expand Down Expand Up @@ -2829,6 +2835,7 @@ private VeniceServerConfig buildVeniceServerConfig(Map<String, Object> extraProp
propertyBuilder.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 1000);
propertyBuilder.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 1000);
propertyBuilder.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true);
propertyBuilder.put(SERVER_RESET_ERROR_REPLICA_ENABLED, true);
extraProperties.forEach(propertyBuilder::put);

Map<String, Map<String, String>> kafkaClusterMap = new HashMap<>();
Expand Down Expand Up @@ -4370,8 +4377,7 @@ public void testIngestionTaskForNonCurrentVersionShouldFailWhenEncounteringMemor
}

@Test
public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEncounteringMemoryLimitException()
throws Exception {
public void testIngestionTaskCurrentVersionKillOngoingPushOnMemoryLimitException() throws Exception {
doThrow(new MemoryLimitExhaustedException("mock exception")).doNothing()
.when(mockAbstractStorageEngine)
.put(anyInt(), any(), (ByteBuffer) any());
Expand All @@ -4390,6 +4396,30 @@ public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEnco
}, AA_OFF);
}

@Test
public void testIngestionTaskForCurrentVersionResetException() throws Exception {
doThrow(new VeniceException("mock exception")).doNothing()
.when(mockAbstractStorageEngine)
.put(anyInt(), any(), (ByteBuffer) any());
isCurrentVersion = () -> true;

localVeniceWriter.broadcastStartOfPush(new HashMap<>());
localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null).get();
localVeniceWriter.broadcastEndOfPush(new HashMap<>());

StoreIngestionTaskTestConfig testConfig =
new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> {
// pcs.completionReported();
verify(mockAbstractStorageEngine, timeout(10000).times(1)).put(eq(PARTITION_FOO), any(), (ByteBuffer) any());
Utils.sleep(1000);
verify(zkHelixAdmin, atLeast(1)).setPartitionsToError(anyString(), anyString(), anyString(), anyList());
}, AA_OFF);
testConfig.setStoreVersionConfigOverride(configOverride -> {
doReturn(true).when(configOverride).isResetErrorReplicaEnabled();
});
runTest(testConfig);
}

@Test
public void testShouldProduceToVersionTopic() throws Exception {
runTest(Collections.singleton(PARTITION_FOO), () -> {
Expand Down

0 comments on commit 82e037a

Please sign in to comment.