Skip to content

Commit

Permalink
Fix MSK integration test fix (#3465)
Browse files Browse the repository at this point in the history
Signed-off-by: Kondaka <krishkdk@bcd07441e083.ant.amazon.com>
Co-authored-by: Kondaka <krishkdk@bcd07441e083.ant.amazon.com>
  • Loading branch information
kkondaka and Kondaka authored Oct 9, 2023
1 parent dd8f703 commit 68875c4
Showing 1 changed file with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.AwsIamAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.MskBrokerConnectionType;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
Expand Down Expand Up @@ -166,14 +167,19 @@ public void setup() {
when(avroTopic.getAutoCommit()).thenReturn(false);
when(avroTopic.getAutoOffsetReset()).thenReturn("earliest");
when(avroTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(avroTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(5));
when(avroTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(300));
when(avroTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(10));
when(avroTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
when(avroTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.INCLUDE_AS_FIELD);
when(jsonTopic.getName()).thenReturn(testTopic);
when(jsonTopic.getMaxPollInterval()).thenReturn(Duration.ofSeconds(300));
when(jsonTopic.getGroupId()).thenReturn(testGroup);
when(jsonTopic.getWorkers()).thenReturn(1);
when(jsonTopic.getAutoCommit()).thenReturn(false);
when(jsonTopic.getAutoOffsetReset()).thenReturn("earliest");
when(jsonTopic.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(jsonTopic.getSessionTimeOut()).thenReturn(Duration.ofSeconds(10));
when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.INCLUDE_AS_FIELD);
when(jsonTopic.getHeartBeatInterval()).thenReturn(Duration.ofSeconds(3));
bootstrapServers = System.getProperty("tests.kafka.bootstrap_servers");
testRegistryName = System.getProperty("tests.kafka.glue_registry_name");
Expand Down Expand Up @@ -234,10 +240,10 @@ public void TestJsonRecordConsumer() throws Exception {
for (int i = 0; i < numRecords; i++) {
Record<Event> record = receivedRecords.get(i);
Event event = (Event)record.getData();
Map<String, Object> val = event.get("message-"+i, Map.class);
assertThat(val.get("username"), equalTo(TEST_USER+i));
assertThat(val.get("message"), equalTo(TEST_MESSAGE+i));
assertThat(((Number)val.get("timestamp")).intValue(), equalTo(TEST_TIMESTAMP_INT+i));
assertThat(event.get("username", String.class), equalTo(TEST_USER+i));
assertThat(event.get("message", String.class), equalTo(TEST_MESSAGE+i));
assertThat(event.get("timestamp", Number.class).intValue(), equalTo(TEST_TIMESTAMP_INT+i));
assertThat(event.get("kafka_key", String.class), equalTo("message-"+i));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -301,10 +307,10 @@ public void TestAvroRecordConsumer() throws Exception {
for (int i = 0; i < numRecords; i++) {
Record<Event> record = receivedRecords.get(i);
Event event = (Event)record.getData();
Map<String, Object> val = event.get(TEST_USER+i, Map.class);
assertThat(val.get("username"), equalTo(TEST_USER+i));
assertThat(val.get("message"), equalTo(TEST_MESSAGE+i));
assertThat(((Number)val.get("timestamp")).longValue(), equalTo(TEST_TIMESTAMP+i));
assertThat(event.get("username", String.class), equalTo(TEST_USER+i));
assertThat(event.get("message", String.class), equalTo(TEST_MESSAGE+i));
assertThat(event.get("timestamp", Number.class).longValue(), equalTo(TEST_TIMESTAMP+i));
assertThat(event.get("kafka_key", String.class), equalTo(TEST_USER+i));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down

0 comments on commit 68875c4

Please sign in to comment.