Skip to content

Commit

Permalink
UsingTestContainerForKafkaIntegrationTests (#38884)
Browse files Browse the repository at this point in the history
* Add integration tests for source connector by using TestContainers

---------

Co-authored-by: annie-mac <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and annie-mac authored Mar 1, 2024
1 parent 6c1f680 commit 12bec49
Show file tree
Hide file tree
Showing 10 changed files with 700 additions and 5 deletions.
3 changes: 3 additions & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ cosmos_org.scalastyle:scalastyle-maven-plugin;1.0.0
cosmos_org.apache.kafka:connect-api;3.6.0
# Cosmos Kafka connector tests only
cosmos_org.apache.kafka:connect-runtime;3.6.0
cosmos_org.testcontainers:testcontainers;1.19.5
cosmos_org.testcontainers:kafka;1.19.5
cosmos_org.sourcelab:kafka-connect-client;4.0.4
# Maven Tools for Cosmos Kafka connector only
cosmos_io.confluent:kafka-connect-maven-plugin;0.12.0

Expand Down
20 changes: 19 additions & 1 deletion sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,24 @@ Licensed under the MIT License.
<version>1.14.8</version> <!-- {x-version-update;testdep_net.bytebuddy:byte-buddy-agent;external_dependency} -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.5</version> <!-- {x-version-update;cosmos_org.testcontainers:testcontainers;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.5</version> <!-- {x-version-update;cosmos_org.testcontainers:kafka;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-connect-client</artifactId>
<version>4.0.4</version> <!-- {x-version-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -218,6 +235,7 @@ Licensed under the MIT License.
<include>com.azure:*</include>
<include>org.apache.kafka:connect-api:[3.6.0]</include> <!-- {x-include-update;cosmos_org.apache.kafka:connect-api;external_dependency} -->
<include>io.confluent:kafka-connect-maven-plugin:[0.12.0]</include> <!-- {x-include-update;cosmos_io.confluent:kafka-connect-maven-plugin;external_dependency} -->
<include>org.sourcelab:kafka-connect-client:[4.0.4]</include> <!-- {x-include-update;cosmos_org.sourcelab:kafka-connect-client;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public void start(Map<String, String> map) {

this.taskUnitsQueue.addAll(this.taskConfig.getFeedRangeTaskUnits());
LOGGER.info("Creating the cosmos client");

// TODO[GA]: optimize the client creation, client metadata cache?
this.cosmosClient = CosmosClientStore.getCosmosClient(this.taskConfig.getAccountConfig());
}

Expand Down Expand Up @@ -95,7 +97,7 @@ public List<SourceRecord> poll() {
}

stopwatch.stop();
LOGGER.info(
LOGGER.debug(
"Return {} records, databaseName {}, containerName {}, containerRid {}, feedRange {}, durationInMs {}",
results.size(),
((FeedRangeTaskUnit) taskUnit).getDatabaseName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env pwsh
$ErrorActionPreference='Stop'
cd $PSScriptRoot

Write-Host "Deleting prior Cosmos DB connectors..."
rm -rf "$PSScriptRoot/src/test/connectorPlugins/connectors"
New-Item -Path "$PSScriptRoot/src/test/connectorPlugins" -ItemType "directory" -Name "connectors" -Force | Out-Null

Write-Host "Rebuilding Cosmos DB connectors..."
mvn clean package -DskipTests -Dmaven.javadoc.skip
copy target\*-jar-with-dependencies.jar $PSScriptRoot/src/test/connectorPlugins/connectors
cd $PSScriptRoot/src/test/connectorPlugins

Write-Host "Adding custom Insert UUID SMT"
cd $PSScriptRoot/src/test/connectorPlugins/connectors
git clone https://github.com/confluentinc/kafka-connect-insert-uuid.git insertuuid -q && cd insertuuid
mvn clean package -DskipTests=true
copy target\*.jar $PSScriptRoot/src/test/connectorPlugins/connectors
rm -rf "$PSScriptRoot/src/test/connectorPlugins/connectors/insertuuid"
cd $PSScriptRoot
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

echo "Deleting prior Cosmos DB connectors..."
rm -rf src/test/connectorPlugins/connectors
mkdir src/test/connectorPlugins/connectors

echo "Rebuilding Cosmos DB connectors..."
mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true
cp target/*-jar-with-dependencies.jar src/test/connectorPlugins/connectors
cd src/test/connectorPlugins

echo "Adding custom Insert UUID SMT"
cd connectors
git clone https://github.com/confluentinc/kafka-connect-insert-uuid.git insertuuid -q && cd insertuuid
mvn clean package -DskipTests=true
cp target/*.jar ../
cd .. && rm -rf insertuuid
cd ../
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosSourceConfig;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class CosmosDbSourceConnectorITest extends KafkaCosmosIntegrationTestSuiteBase {
private static final Logger logger = LoggerFactory.getLogger(CosmosDbSourceConnectorITest.class);

// TODO[public preview]: add more integration tests
@Test(groups = { "kafka-integration"}, timeOut = TIMEOUT)
public void readFromSingleContainer() {
Map<String, String> sourceConnectorConfig = new HashMap<>();
sourceConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosDBSourceConnector");
sourceConnectorConfig.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
sourceConnectorConfig.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sourceConnectorConfig.put("kafka.connect.cosmos.applicationName", "Test");
sourceConnectorConfig.put("kafka.connect.cosmos.source.database.name", databaseName);
sourceConnectorConfig.put("kafka.connect.cosmos.source.containers.includeAll", "false");
sourceConnectorConfig.put("kafka.connect.cosmos.source.containers.includedList", singlePartitionContainerName);

// Create topic ahead of time
kafkaCosmosConnectContainer.createTopic(singlePartitionContainerName, 1);

CosmosSourceConfig sourceConfig = new CosmosSourceConfig(sourceConnectorConfig);
CosmosAsyncClient client = CosmosClientStore.getCosmosClient(sourceConfig.getAccountConfig());
CosmosAsyncContainer container = client.getDatabase(databaseName).getContainer(singlePartitionContainerName);

String connectorName = "simpleTest-" + UUID.randomUUID();
try {
// create few items in the container
logger.info("creating items in container {}", singlePartitionContainerName);
List<String> createdItems = new ArrayList<>();
for (int i = 0; i < 10; i++) {
TestItem testItem = TestItem.createNewItem();
container.createItem(testItem).block();
createdItems.add(testItem.getId());
}

kafkaCosmosConnectContainer.registerConnector(connectorName, sourceConnectorConfig);

logger.info("Getting consumer and subscribe to topic {}", singlePartitionContainerName);
KafkaConsumer<String, JsonNode> kafkaConsumer = kafkaCosmosConnectContainer.getConsumer();
kafkaConsumer.subscribe(
Arrays.asList(
singlePartitionContainerName,
sourceConfig.getMetadataConfig().getMetadataTopicName()));

List<ConsumerRecord<String, JsonNode>> metadataRecords = new ArrayList<>();
List<ConsumerRecord<String, JsonNode>> itemRecords = new ArrayList<>();

Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> {;
kafkaConsumer.poll(Duration.ofMillis(1000))
.iterator()
.forEachRemaining(consumerRecord -> {
if (consumerRecord.topic().equals(singlePartitionContainerName)) {
itemRecords.add(consumerRecord);
} else if (consumerRecord.topic().equals(sourceConfig.getMetadataConfig().getMetadataTopicName())) {
metadataRecords.add(consumerRecord);
}
});
return metadataRecords.size() >= 2 && itemRecords.size() >= createdItems.size();
});

//TODO[public preview]currently the metadata record value is null, populate it with metadata and validate the content here
assertThat(metadataRecords.size()).isEqualTo(2);
assertThat(itemRecords.size()).isEqualTo(createdItems.size());

List<String> receivedItems =
itemRecords.stream().map(consumerRecord -> {
JsonNode jsonNode = consumerRecord.value();
return jsonNode.get("payload").get("id").asText();
}).collect(Collectors.toList());

assertThat(receivedItems.containsAll(createdItems)).isTrue();

} finally {
if (client != null) {
logger.info("cleaning container {}", singlePartitionContainerName);
cleanUpContainer(client, databaseName, singlePartitionContainerName);
client.close();
}

// IMPORTANT: remove the connector after use
if (kafkaCosmosConnectContainer != null) {
kafkaCosmosConnectContainer.deleteConnector(connectorName);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect;

import com.azure.core.exception.ResourceNotFoundException;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.connect.apiclient.Configuration;
import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

public class KafkaCosmosConnectContainer extends GenericContainer<KafkaCosmosConnectContainer> {
private static final Logger logger = LoggerFactory.getLogger(KafkaCosmosConnectContainer.class);
private static final int KAFKA_CONNECT_PORT = 8083;
private KafkaConsumer<String, JsonNode> kafkaConsumer;
private AdminClient adminClient;
private int replicationFactor = 1;

public KafkaCosmosConnectContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
defaultConfig();
}

private void defaultConfig() {
withEnv("CONNECT_GROUP_ID", KafkaCosmosTestConfigurations.CONNECT_GROUP_ID);
withEnv("CONNECT_CONFIG_STORAGE_TOPIC", KafkaCosmosTestConfigurations.CONNECT_CONFIG_STORAGE_TOPIC);
withEnv("CONNECT_OFFSET_STORAGE_TOPIC", KafkaCosmosTestConfigurations.CONNECT_OFFSET_STORAGE_TOPIC);
withEnv("CONNECT_STATUS_STORAGE_TOPIC", KafkaCosmosTestConfigurations.CONNECT_STATUS_STORAGE_TOPIC);
withEnv("CONNECT_KEY_CONVERTER", KafkaCosmosTestConfigurations.CONNECT_KEY_CONVERTER);
withEnv("CONNECT_VALUE_CONVERTER", KafkaCosmosTestConfigurations.CONNECT_VALUE_CONVERTER);
withEnv("CONNECT_PLUGIN_PATH", KafkaCosmosTestConfigurations.CONNECT_PLUGIN_PATH);
withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", KafkaCosmosTestConfigurations.CONNECT_REST_ADVERTISED_HOST_NAME);
withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", KafkaCosmosTestConfigurations.CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR);
withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", KafkaCosmosTestConfigurations.CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR);
withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", KafkaCosmosTestConfigurations.CONNECT_STATUS_STORAGE_REPLICATION_FACTOR);
// withEnv("CONNECT_LOG4J_ROOT_LOGLEVEL", "DEBUG");
// withEnv("CONNECT_LOG4J_LOGGERS", "org.apache.kafka=DEBUG,org.reflections=DEBUG,com.azure.cosmos.kafka=DEBUG");

withExposedPorts(KAFKA_CONNECT_PORT);
}

private Properties defaultConsumerConfig() {
Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.put("group.id", "IntegrationTest");
kafkaConsumerProperties.put("value.deserializer", JsonDeserializer.class.getName());
kafkaConsumerProperties.put("key.deserializer", StringDeserializer.class.getName());
kafkaConsumerProperties.put("sasl.mechanism", "PLAIN");
kafkaConsumerProperties.put("client.dns.lookup", "use_all_dns_ips");
kafkaConsumerProperties.put("session.timeout.ms", "45000");
return kafkaConsumerProperties;
}

public KafkaCosmosConnectContainer withLocalKafkaContainer(final KafkaContainer kafkaContainer) {
withNetwork(kafkaContainer.getNetwork());

withEnv("CONNECT_BOOTSTRAP_SERVERS", kafkaContainer.getNetworkAliases().get(0) + ":9092");
return self();
}

public KafkaCosmosConnectContainer withCloudKafkaContainer() {
withEnv("CONNECT_BOOTSTRAP_SERVERS", KafkaCosmosTestConfigurations.BOOTSTRAP_SERVER);
withEnv("CONNECT_SECURITY_PROTOCOL", "SASL_SSL");
withEnv("CONNECT_SASL_JAAS_CONFIG", KafkaCosmosTestConfigurations.SASL_JAAS);
withEnv("CONNECT_SASL_MECHANISM", "PLAIN");

withEnv("CONNECT_PRODUCER_SECURITY_PROTOCOL", "SASL_SSL");
withEnv("CONNECT_PRODUCER_SASL_JAAS_CONFIG", KafkaCosmosTestConfigurations.SASL_JAAS);
withEnv("CONNECT_PRODUCER_SASL_MECHANISM", "PLAIN");

withEnv("CONNECT_CONSUMER_SECURITY_PROTOCOL", "SASL_SSL");
withEnv("CONNECT_CONSUMER_SASL_JAAS_CONFIG", KafkaCosmosTestConfigurations.SASL_JAAS);
withEnv("CONNECT_CONSUMER_SASL_MECHANISM", "PLAIN");
return self();
}

public KafkaCosmosConnectContainer withLocalBootstrapServer(String localBootstrapServer) {
Properties consumerProperties = defaultConsumerConfig();
consumerProperties.put("bootstrap.servers", localBootstrapServer);
this.kafkaConsumer = new KafkaConsumer<>(consumerProperties);
this.adminClient = this.getAdminClient(localBootstrapServer);
return self();
}

public KafkaCosmosConnectContainer withCloudBootstrapServer() {
Properties consumerProperties = defaultConsumerConfig();
consumerProperties.put("bootstrap.servers", KafkaCosmosTestConfigurations.BOOTSTRAP_SERVER);
consumerProperties.put("sasl.jaas.config", KafkaCosmosTestConfigurations.SASL_JAAS);
consumerProperties.put("security.protocol", "SASL_SSL");
consumerProperties.put("sasl.mechanism", "PLAIN");

this.kafkaConsumer = new KafkaConsumer<>(consumerProperties);
this.adminClient = this.getAdminClient(KafkaCosmosTestConfigurations.BOOTSTRAP_SERVER);
this.replicationFactor = 3;
return self();
}

public void registerConnector(String name, Map<String, String> config) {
NewConnectorDefinition newConnectorDefinition = new NewConnectorDefinition(name, config);
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));

logger.info("adding kafka connector {}", name);

try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ConnectorDefinition connectorDefinition = kafkaConnectClient.addConnector(newConnectorDefinition);
logger.info("adding kafka connector completed with " + connectorDefinition);
}

public void deleteConnector(String name) {
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));
try {
kafkaConnectClient.deleteConnector(name);
logger.info("Deleting container {} succeeded.", name);
} catch (Exception exception) {
if (exception instanceof ResourceNotFoundException) {
logger.info("Connector {} not found");
}

logger.warn("Failed to delete connector {}", name);
}
}

public KafkaConsumer<String, JsonNode> getConsumer() {
return this.kafkaConsumer;
}

public String getTarget() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(KAFKA_CONNECT_PORT);
}

public void createTopic(String topicName, int numPartitions) {
this.adminClient.createTopics(
Arrays.asList(new NewTopic(topicName, numPartitions, (short) replicationFactor)));
}

private AdminClient getAdminClient(String bootstrapServer) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServer);
return AdminClient.create(properties);
}
}
Loading

0 comments on commit 12bec49

Please sign in to comment.