Skip to content

Commit

Permalink
Remove 2nd case
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Nov 23, 2024
1 parent 2842dd0 commit de38ec6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1111,18 +1111,18 @@ public void consumerSubscribe(PubSubTopicPartition pubSubTopicPartition, long st
VeniceServerConfig serverConfig = getServerConfig();
if (isDaVinciClient() || serverConfig.getKafkaClusterUrlToIdMap().containsKey(pubSubAddress)) {
super.consumerSubscribe(pubSubTopicPartition, startOffset, pubSubAddress);
} else {
LOGGER.error(
"PubSub address: {} is not in the pubsub cluster map: {}. Cannot subscribe to topic-partition: {}",
pubSubAddress,
serverConfig.getKafkaClusterUrlToIdMap(),
pubSubTopicPartition);
throw new VeniceException(
String.format(
"PubSub address: %s is not in the pubsub cluster map. Cannot subscribe to topic-partition: %s",
pubSubAddress,
pubSubTopicPartition));
return;
}
LOGGER.error(
"PubSub address: {} is not in the pubsub cluster map: {}. Cannot subscribe to topic-partition: {}",
pubSubAddress,
serverConfig.getKafkaClusterUrlToIdMap(),
pubSubTopicPartition);
throw new VeniceException(
String.format(
"PubSub address: %s is not in the pubsub cluster map. Cannot subscribe to topic-partition: %s",
pubSubAddress,
pubSubTopicPartition));
}

private long calculateRewindStartTime(PartitionConsumptionState partitionConsumptionState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -741,7 +742,7 @@ public void getKeyLevelLockMaxPoolSizeBasedOnServerConfigTest() {
@Test
public void testConsumerSubscribe() {
// Case 1: AA store ingestion task with invalid pubsub address
ActiveActiveStoreIngestionTask ingestionTask = mock(ActiveActiveStoreIngestionTask.class);
ActiveActiveStoreIngestionTask ingestionTask = spy(ActiveActiveStoreIngestionTask.class);
VeniceServerConfig mockServerConfig = mock(VeniceServerConfig.class);

when(ingestionTask.getServerConfig()).thenReturn(mockServerConfig);
Expand All @@ -764,17 +765,19 @@ public void testConsumerSubscribe() {
verify(ingestionTask, times(1)).consumerSubscribe(pubSubTopicPartition, 100L, "invalidPubSubAddress");

// Case 2: DaVinci client
ActiveActiveStoreIngestionTask dvcIngestionTask = mock(ActiveActiveStoreIngestionTask.class);
ActiveActiveStoreIngestionTask dvcIngestionTask = spy(ActiveActiveStoreIngestionTask.class);
doCallRealMethod().when(dvcIngestionTask).consumerSubscribe(any(), anyLong(), anyString());
when(dvcIngestionTask.getServerConfig()).thenReturn(mockServerConfig);
when(dvcIngestionTask.isDaVinciClient()).thenReturn(true);
when(mockServerConfig.getKafkaClusterUrlToIdMap()).thenReturn(Object2IntMaps.emptyMap());
try {
dvcIngestionTask.consumerSubscribe(pubSubTopicPartition, 100L, "validPubSubAddress");
} catch (Exception e) {
assertFalse(
e.getMessage().contains("is not in the pubsub cluster map"),
"Exception message should not contain the expected message but found: " + e.getMessage());
if (e.getMessage() != null) {
assertFalse(
e.getMessage().contains("is not in the pubsub cluster map"),
"Exception message should not contain the expected message but found: " + e.getMessage());
}
}
verify(dvcIngestionTask, times(1)).consumerSubscribe(pubSubTopicPartition, 100L, "validPubSubAddress");
}
Expand Down

0 comments on commit de38ec6

Please sign in to comment.