diff --git a/CHANGELOG.md b/CHANGELOG.md index 185b0bf6..c111ca68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ ## Release History +### 1.15.0-beta.1 (Unreleased) +#### New Features + +#### Key Bug Fixes +* Fixed an issue where only 1 task run successfully when `CosmosDBSourceConnector` is configured with `maxTasks` larger than `1` - [PR 561](https://github.com/microsoft/kafka-connect-cosmosdb/pull/561) + +#### Other Changes + ### 1.14.1 (2024-02-29) #### Key Bug Fixes * Fixed `NullPointerException` in `CosmosDBSourceConnector`. [PR 555](https://github.com/microsoft/kafka-connect-cosmosdb/pull/555) diff --git a/pom.xml b/pom.xml index 78d12178..1aba590b 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.azure.cosmos.kafka kafka-connect-cosmos - 1.14.1 + 1.15.0-beta.1 kafka-connect-cosmos https://github.com/microsoft/kafka-connect-cosmosdb diff --git a/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosClientStore.java b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosClientStore.java new file mode 100644 index 00000000..b1d626f0 --- /dev/null +++ b/src/main/java/com/azure/cosmos/kafka/connect/implementations/CosmosClientStore.java @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.kafka.connect.implementations; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.kafka.connect.source.CosmosDBSourceConfig; +import org.apache.commons.lang3.StringUtils; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; + +public class CosmosClientStore { + public static CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config, String userAgentSuffix) { + checkArgument(StringUtils.isNotEmpty(userAgentSuffix), "Argument 'userAgentSuffix' can not be null"); + + CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder() + .endpoint(config.getConnEndpoint()) + .key(config.getConnKey()) + .consistencyLevel(ConsistencyLevel.SESSION) + .contentResponseOnWriteEnabled(true) + .connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled()) + .userAgentSuffix(userAgentSuffix); + + if (config.isGatewayModeEnabled()) { + cosmosClientBuilder.gatewayMode(); + } + + return cosmosClientBuilder.buildAsyncClient(); + } +} diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConfig.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConfig.java index 5af36134..f028de98 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConfig.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConfig.java @@ -64,6 +64,7 @@ public class CosmosDBSourceConfig extends CosmosDBConfig { private static final String COSMOS_USE_LATEST_OFFSET_DISPLAY = "Use latest offset"; static final String COSMOS_ASSIGNED_CONTAINER_CONF = "connect.cosmos.assigned.container"; + static final String COSMOS_ASSIGNED_LEASE_CONTAINER_CONF = "connect.cosmos.assigned.lease.container"; static final String COSMOS_WORKER_NAME_CONF = "connect.cosmos.worker.name"; static final String COSMOS_WORKER_NAME_DEFAULT = "worker"; @@ -80,6 +81,7 @@ public class CosmosDBSourceConfig extends CosmosDBConfig { // Variables not defined as Connect configs, should not be exposed when creating connector private String workerName; private String assignedContainer; + private String assignedLeaseContainer; public CosmosDBSourceConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); @@ -98,6 +100,7 @@ public CosmosDBSourceConfig(Map parsedConfig) { // Since variables are not defined as Connect configs, grab values directly from Map assignedContainer = parsedConfig.get(COSMOS_ASSIGNED_CONTAINER_CONF); + assignedLeaseContainer = parsedConfig.get(COSMOS_ASSIGNED_LEASE_CONTAINER_CONF); workerName = parsedConfig.get(COSMOS_WORKER_NAME_CONF); } @@ -242,6 +245,10 @@ public String getAssignedContainer() { return this.assignedContainer; } + public String getAssignedLeaseContainer() { + return assignedLeaseContainer; + } + public String getWorkerName() { return this.workerName; } diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnector.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnector.java index 21f19253..0dc5615a 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnector.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnector.java @@ -8,6 +8,17 @@ import java.util.function.Function; import java.util.stream.Collectors; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.kafka.connect.CosmosDBConfig; +import com.azure.cosmos.kafka.connect.implementations.CosmosClientStore; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosContainerRequestOptions; +import com.azure.cosmos.models.CosmosContainerResponse; +import com.azure.cosmos.models.ThroughputProperties; import org.apache.commons.lang3.RandomUtils; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; @@ -28,12 +39,20 @@ public class CosmosDBSourceConnector extends SourceConnector { private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceConnector.class); private CosmosDBSourceConfig config = null; + private CosmosAsyncClient cosmosClient = null; @Override public void start(Map props) { logger.info("Starting the Source Connector"); try { config = new CosmosDBSourceConfig(props); + this.cosmosClient = CosmosClientStore.getCosmosClient(this.config, this.getUserAgentSuffix()); + + List containerList = config.getTopicContainerMap().getContainerList(); + for (String containerId : containerList) { + createLeaseContainerIfNotExists(cosmosClient, this.config.getDatabaseName(), this.getAssignedLeaseContainer(containerId)); + } + } catch (ConfigException e) { throw new ConnectException( "Couldn't start CosmosDBSourceConnector due to configuration error", e); @@ -59,8 +78,10 @@ public List> taskConfigs(int maxTasks) { for (int i = 0; i < maxTasks; i++) { // Equally distribute workers by assigning workers to containers in round-robin fashion. Map taskProps = config.originalsStrings(); - taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_CONTAINER_CONF, - containerList.get(i % containerList.size())); + String assignedContainer = containerList.get(i % containerList.size()); + + taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_CONTAINER_CONF, assignedContainer); + taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_LEASE_CONTAINER_CONF, this.getAssignedLeaseContainer(assignedContainer)); taskProps.put(CosmosDBSourceConfig.COSMOS_WORKER_NAME_CONF, String.format("%s-%d-%d", CosmosDBSourceConfig.COSMOS_WORKER_NAME_DEFAULT, @@ -74,6 +95,9 @@ public List> taskConfigs(int maxTasks) { @Override public void stop() { logger.info("Stopping CosmosDB Source Connector"); + if (this.cosmosClient != null) { + this.cosmosClient.close(); + } } @Override @@ -101,4 +125,46 @@ public Config validate(Map connectorConfigs) { return config; } + + private String getAssignedLeaseContainer(String containerName) { + return containerName + "-leases"; + } + + private String getUserAgentSuffix() { + return CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version(); + } + + private CosmosAsyncContainer createLeaseContainerIfNotExists(CosmosAsyncClient client, String databaseName, String leaseCollectionName) { + CosmosAsyncDatabase database = client.getDatabase(databaseName); + CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName); + CosmosContainerResponse leaseContainerResponse = null; + + logger.info("Checking whether the lease container exists."); + try { + leaseContainerResponse = leaseCollection.read().block(); + } catch (CosmosException ex) { + // Swallowing exceptions when the type is CosmosException and statusCode is 404 + if (ex.getStatusCode() != 404) { + throw ex; + } + logger.info("Lease container does not exist {}", ex.getMessage()); + } + + if (leaseContainerResponse == null) { + logger.info("Creating the Lease container : {}", leaseCollectionName); + CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id"); + ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400); + CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions(); + + try { + database.createContainer(containerSettings, throughputProperties, requestOptions).block(); + } catch (Exception e) { + logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName); + throw e; + } + logger.info("Successfully created new lease container."); + } + + return database.getContainer(leaseCollectionName); + } } diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java index 39e8f646..f2d0b5d5 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java @@ -5,20 +5,14 @@ import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.ChangeFeedProcessorBuilder; -import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosAsyncDatabase; -import com.azure.cosmos.CosmosClientBuilder; -import com.azure.cosmos.CosmosException; import com.azure.cosmos.kafka.connect.CosmosDBConfig; import com.azure.cosmos.kafka.connect.TopicContainerMap; +import com.azure.cosmos.kafka.connect.implementations.CosmosClientStore; import com.azure.cosmos.kafka.connect.implementations.CosmosKafkaSchedulers; import com.azure.cosmos.models.ChangeFeedProcessorOptions; -import com.azure.cosmos.models.CosmosContainerProperties; -import com.azure.cosmos.models.CosmosContainerRequestOptions; -import com.azure.cosmos.models.CosmosContainerResponse; -import com.azure.cosmos.models.ThroughputProperties; import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -26,6 +20,7 @@ import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; import java.time.Duration; import java.util.ArrayList; @@ -69,13 +64,13 @@ public void start(Map map) { this.queue = new LinkedTransferQueue<>(); logger.info("Worker {} Creating the client.", this.config.getWorkerName()); - client = getCosmosClient(config); + client = CosmosClientStore.getCosmosClient(this.config, this.getUserAgentSuffix()); // Initialize the database, feed and lease containers CosmosAsyncDatabase database = client.getDatabase(config.getDatabaseName()); String container = config.getAssignedContainer(); CosmosAsyncContainer feedContainer = database.getContainer(container); - leaseContainer = createNewLeaseContainer(client, config.getDatabaseName(), container + "-leases"); + leaseContainer = database.getContainer(this.config.getAssignedLeaseContainer()); // Create source partition map partitionMap = new HashMap<>(); @@ -212,34 +207,28 @@ public void stop() { // NOTE: poll() method and stop() method are both called from the same thread, // so it is important not to include any changes which may block both places forever running.set(false); - - // Release all the resources. - if (changeFeedProcessor != null) { - changeFeedProcessor.stop().block(); - changeFeedProcessor = null; - } - if (this.client != null) { - this.client.close(); - } + Mono.just(this) + .flatMap(connectorTask -> { + if (this.changeFeedProcessor != null) { + return this.changeFeedProcessor.stop() + .delayElement(Duration.ofMillis(500)) // delay some time here as the partitionProcessor will release the lease in background + .doOnNext(t -> { + this.changeFeedProcessor = null; + this.safeCloseClient(); + }); + } else { + this.safeCloseClient(); + return Mono.empty(); + } + }) + .block(); } - private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) { - logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName()); - - CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder() - .endpoint(config.getConnEndpoint()) - .key(config.getConnKey()) - .consistencyLevel(ConsistencyLevel.SESSION) - .contentResponseOnWriteEnabled(true) - .connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled()) - .userAgentSuffix(getUserAgentSuffix()); - - if (config.isGatewayModeEnabled()) { - cosmosClientBuilder.gatewayMode(); + private void safeCloseClient() { + if (this.client != null) { + this.client.close(); } - - return cosmosClientBuilder.buildAsyncClient(); } private String getUserAgentSuffix() { @@ -292,6 +281,7 @@ protected void handleCosmosDbChanges(List docs) { // The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away // until batch size or maxWaitTime reached. Which can cause CFP to checkpoint faster than kafka batch. // In order to not move CFP checkpoint faster, we are using shouldFillMoreRecords to control the batch flush. + logger.debug("Transferring document " + this.config.getWorkerName()); this.queue.transfer(document); } catch (InterruptedException e) { logger.error("Interrupted! changeFeedReader.", e); @@ -307,38 +297,4 @@ protected void handleCosmosDbChanges(List docs) { this.shouldFillMoreRecords.set(false); } } - - private CosmosAsyncContainer createNewLeaseContainer(CosmosAsyncClient client, String databaseName, String leaseCollectionName) { - CosmosAsyncDatabase database = client.getDatabase(databaseName); - CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName); - CosmosContainerResponse leaseContainerResponse = null; - - logger.info("Checking whether the lease container exists."); - try { - leaseContainerResponse = leaseCollection.read().block(); - } catch (CosmosException ex) { - // Swallowing exceptions when the type is CosmosException and statusCode is 404 - if (ex.getStatusCode() != 404) { - throw ex; - } - logger.info("Lease container does not exist {}", ex.getMessage()); - } - - if (leaseContainerResponse == null) { - logger.info("Creating the Lease container : {}", leaseCollectionName); - CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id"); - ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400); - CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions(); - - try { - database.createContainer(containerSettings, throughputProperties, requestOptions).block(); - } catch (Exception e) { - logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName); - throw e; - } - logger.info("Successfully created new lease container."); - } - - return database.getContainer(leaseCollectionName); - } } diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnectorTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnectorTest.java index 2bf1af8e..88483dfa 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnectorTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceConnectorTest.java @@ -3,15 +3,28 @@ package com.azure.cosmos.kafka.connect.source; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosAsyncDatabase; +import com.azure.cosmos.kafka.connect.implementations.CosmosClientStore; +import com.azure.cosmos.models.CosmosContainerResponse; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigValue; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; import java.util.Set; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; /** * Tests the configuration of Source Connector @@ -22,6 +35,22 @@ public class CosmosDBSourceConnectorTest { private static final String BATCH_SETTING_NAME = CosmosDBSourceConfig.COSMOS_SOURCE_TASK_BATCH_SIZE_CONF; private static final Long BATCH_SETTING = new CosmosDBSourceConfig(CosmosDBSourceConfigTest.setupConfigs()).getTaskBatchSize(); + @BeforeClass + public static void setup() { + MockedStatic clientStoreMock = Mockito.mockStatic(CosmosClientStore.class); + CosmosAsyncClient clientMock = Mockito.mock(CosmosAsyncClient.class); + clientStoreMock.when(() -> CosmosClientStore.getCosmosClient(any(), any())).thenReturn(clientMock); + + CosmosAsyncDatabase databaseMock = Mockito.mock(CosmosAsyncDatabase.class); + Mockito.when(clientMock.getDatabase(anyString())).thenReturn(databaseMock); + + CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class); + Mockito.when(databaseMock.getContainer(anyString())).thenReturn(containerMock); + + CosmosContainerResponse containerResponseMock = Mockito.mock(CosmosContainerResponse.class); + Mockito.when(containerMock.read()).thenReturn(Mono.just(containerResponseMock)); + } + @Test public void testConfig(){ ConfigDef configDef = new CosmosDBSourceConnector().config(); @@ -61,7 +90,7 @@ public void testNumericValidation(){ } @Test - public void testTaskConfigs(){ + public void testTaskConfigs() { Map settingAssignment = CosmosDBSourceConfigTest.setupConfigs(); CosmosDBSourceConnector sourceConnector = new CosmosDBSourceConnector(); sourceConnector.start(settingAssignment); diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java index 9b553634..980662d4 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java @@ -3,19 +3,6 @@ package com.azure.cosmos.kafka.connect.source; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.atomic.AtomicBoolean; - import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosAsyncDatabase; @@ -23,7 +10,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Assert; @@ -31,6 +17,18 @@ import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + public class CosmosDBSourceTaskTest { private CosmosDBSourceTask testTask; private final String topicName = "testtopic"; @@ -54,6 +52,7 @@ public void setup() throws IllegalAccessException { sourceSettings.put(CosmosDBSourceConfig.COSMOS_CONTAINER_TOPIC_MAP_CONF, topicName + "#" + containerName); sourceSettings.put(CosmosDBSourceConfig.COSMOS_DATABASE_NAME_CONF, databaseName); sourceSettings.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_CONTAINER_CONF, containerName); + sourceSettings.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_LEASE_CONTAINER_CONF, containerName + "-leases"); sourceSettings.put(CosmosDBSourceConfig.COSMOS_SOURCE_TASK_POLL_INTERVAL_CONF, "500"); sourceSettings.put(CosmosDBSourceConfig.COSMOS_SOURCE_TASK_BATCH_SIZE_CONF, "1"); sourceSettings.put(CosmosDBSourceConfig.COSMOS_SOURCE_TASK_BUFFER_SIZE_CONF, "5000"); @@ -94,7 +93,6 @@ public void setup() throws IllegalAccessException { FieldUtils.writeField(testTask, "client", mockCosmosClient, true); FieldUtils.writeField(testTask, "leaseContainer", mockLeaseContainer, true); - } @Test(expected = IllegalArgumentException.class)