Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Reset current version replica to ERROR on ingestion error #1387

Merged
merged 8 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1405,8 +1405,8 @@ private void processIngestionException() {
/**
* Special handling for current version when encountering {@link MemoryLimitExhaustedException}.
*/
if (ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)
&& isCurrentVersion.getAsBoolean()) {
if (isCurrentVersion.getAsBoolean()
&& ExceptionUtils.recursiveClassEquals(partitionException, MemoryLimitExhaustedException.class)) {
LOGGER.warn(
"Encountered MemoryLimitExhaustedException, and ingestion task will try to reopen the database and"
+ " resume the consumption after killing ingestion tasks for non current versions");
Expand Down Expand Up @@ -1439,10 +1439,23 @@ private void processIngestionException() {
// DaVinci is always a follower.
subscribePartition(pubSubTopicPartition, false);
}
} else if (isCurrentVersion.getAsBoolean() && resetErrorReplicaEnabled && !isDaVinciClient) {
xunyin8 marked this conversation as resolved.
Show resolved Hide resolved
// marking its replica status ERROR which will later be reset by the controller
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, exceptionPartition)));
LOGGER.error(
"Marking current version replica status to ERROR for replica: {}",
Utils.getReplicaId(kafkaVersionTopic, exceptionPartition),
partitionException);
// No need to reset again, clearing out the exception.
partitionIngestionExceptionList.set(exceptionPartition, null);
} else {
if (!partitionConsumptionState.isCompletionReported()) {
reportError(partitionException.getMessage(), exceptionPartition, partitionException);

} else {
LOGGER.error(
"Ignoring exception for replica: {} since it is already online. The replica will continue serving reads, but the data may be stale as it is not actively ingesting data. Please engage the Venice DEV team immediately.",
Expand Down Expand Up @@ -1790,6 +1803,15 @@ private void reportError(
} else {
ingestionNotificationDispatcher.reportError(pcsList, message, consumerEx);
}
// Set the replica state to ERROR so that the controller can attempt to reset the partition.
if (!isDaVinciClient && resetErrorReplicaEnabled) {
majisourav99 marked this conversation as resolved.
Show resolved Hide resolved
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, partitionId)));
}
}

private void internalClose(boolean doFlush) {
Expand Down Expand Up @@ -4222,16 +4244,7 @@ public void reportError(String message, int userPartition, Exception e) {
if (partitionConsumptionStateMap.containsKey(userPartition)) {
pcsList.add(partitionConsumptionStateMap.get(userPartition));
}
ingestionNotificationDispatcher.reportError(pcsList, message, e);
// Set the replica state to ERROR so that the controller can attempt to reset the partition.
if (!isDaVinciClient && resetErrorReplicaEnabled) {
zkHelixAdmin.get()
.setPartitionsToError(
serverConfig.getClusterName(),
hostName,
kafkaVersionTopic,
Collections.singletonList(HelixUtils.getPartitionName(kafkaVersionTopic, userPartition)));
}
reportError(pcsList, userPartition, message, e);
}

public boolean isActiveActiveReplicationEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_RESET_ERROR_REPLICA_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_UNSUB_AFTER_BATCHPUSH;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
Expand All @@ -46,6 +47,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -385,6 +387,8 @@ public static Object[][] sortedInputAndAAConfigProvider() {

private Runnable runnableForKillNonCurrentVersion;

private ZKHelixAdmin zkHelixAdmin;

private static final int PARTITION_COUNT = 10;
private static final Set<Integer> ALL_PARTITIONS = new HashSet<>();
static {
Expand Down Expand Up @@ -858,6 +862,8 @@ private void runTest(
Properties kafkaProps = new Properties();
kafkaProps.put(KAFKA_BOOTSTRAP_SERVERS, inMemoryLocalKafkaBroker.getKafkaBootstrapServer());

zkHelixAdmin = mock(ZKHelixAdmin.class);
doNothing().when(zkHelixAdmin).setPartitionsToError(anyString(), anyString(), anyString(), anyList());
storeIngestionTaskUnderTest = spy(
ingestionTaskFactory.getNewIngestionTask(
storageService,
Expand All @@ -870,7 +876,7 @@ private void runTest(
false,
Optional.empty(),
recordTransformerFunction,
Lazy.of(() -> mock(ZKHelixAdmin.class))));
Lazy.of(() -> zkHelixAdmin)));

Future testSubscribeTaskFuture = null;
try {
Expand Down Expand Up @@ -2830,6 +2836,7 @@ private VeniceServerConfig buildVeniceServerConfig(Map<String, Object> extraProp
propertyBuilder.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 1000);
propertyBuilder.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 1000);
propertyBuilder.put(SERVER_RESUBSCRIPTION_TRIGGERED_BY_VERSION_INGESTION_CONTEXT_CHANGE_ENABLED, true);
propertyBuilder.put(SERVER_RESET_ERROR_REPLICA_ENABLED, true);
extraProperties.forEach(propertyBuilder::put);

Map<String, Map<String, String>> kafkaClusterMap = new HashMap<>();
Expand Down Expand Up @@ -4371,8 +4378,7 @@ public void testIngestionTaskForNonCurrentVersionShouldFailWhenEncounteringMemor
}

@Test
public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEncounteringMemoryLimitException()
throws Exception {
public void testIngestionTaskCurrentVersionKillOngoingPushOnMemoryLimitException() throws Exception {
doThrow(new MemoryLimitExhaustedException("mock exception")).doNothing()
.when(mockAbstractStorageEngine)
.put(anyInt(), any(), (ByteBuffer) any());
Expand All @@ -4391,6 +4397,30 @@ public void testIngestionTaskForCurrentVersionShouldTryToKillOngoingPushWhenEnco
}, AA_OFF);
}

@Test
public void testIngestionTaskForCurrentVersionResetException() throws Exception {
doThrow(new VeniceException("mock exception")).doNothing()
.when(mockAbstractStorageEngine)
.put(anyInt(), any(), (ByteBuffer) any());
isCurrentVersion = () -> true;

localVeniceWriter.broadcastStartOfPush(new HashMap<>());
localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null).get();
localVeniceWriter.broadcastEndOfPush(new HashMap<>());

StoreIngestionTaskTestConfig testConfig =
new StoreIngestionTaskTestConfig(Collections.singleton(PARTITION_FOO), () -> {
// pcs.completionReported();
verify(mockAbstractStorageEngine, timeout(10000).times(1)).put(eq(PARTITION_FOO), any(), (ByteBuffer) any());
Utils.sleep(1000);
verify(zkHelixAdmin, atLeast(1)).setPartitionsToError(anyString(), anyString(), anyString(), anyList());
}, AA_OFF);
testConfig.setStoreVersionConfigOverride(configOverride -> {
doReturn(true).when(configOverride).isResetErrorReplicaEnabled();
});
runTest(testConfig);
}

@Test
public void testShouldProduceToVersionTopic() throws Exception {
runTest(Collections.singleton(PARTITION_FOO), () -> {
Expand Down
Loading