Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoveLeaseContainerCreationIntoConnector #561

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
## Release History

### 1.15.0-beta.1 (Unreleased)
#### New Features

#### Key Bug Fixes
* Fixed an issue where only 1 task run successfully when `CosmosDBSourceConnector` is configured with `maxTasks` larger than `1` - [PR 561](https://github.com/microsoft/kafka-connect-cosmosdb/pull/561)

#### Other Changes

### 1.14.1 (2024-02-29)
#### Key Bug Fixes
* Fixed `NullPointerException` in `CosmosDBSourceConnector`. [PR 555](https://github.com/microsoft/kafka-connect-cosmosdb/pull/555)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.14.1</version>
<version>1.15.0-beta.1</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.kafka.connect.implementations;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.kafka.connect.source.CosmosDBSourceConfig;
import org.apache.commons.lang3.StringUtils;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;

public class CosmosClientStore {
public static CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config, String userAgentSuffix) {
checkArgument(StringUtils.isNotEmpty(userAgentSuffix), "Argument 'userAgentSuffix' can not be null");

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
.key(config.getConnKey())
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled())
.userAgentSuffix(userAgentSuffix);

if (config.isGatewayModeEnabled()) {
cosmosClientBuilder.gatewayMode();
}

return cosmosClientBuilder.buildAsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class CosmosDBSourceConfig extends CosmosDBConfig {
private static final String COSMOS_USE_LATEST_OFFSET_DISPLAY = "Use latest offset";

static final String COSMOS_ASSIGNED_CONTAINER_CONF = "connect.cosmos.assigned.container";
static final String COSMOS_ASSIGNED_LEASE_CONTAINER_CONF = "connect.cosmos.assigned.lease.container";

static final String COSMOS_WORKER_NAME_CONF = "connect.cosmos.worker.name";
static final String COSMOS_WORKER_NAME_DEFAULT = "worker";
Expand All @@ -80,6 +81,7 @@ public class CosmosDBSourceConfig extends CosmosDBConfig {
// Variables not defined as Connect configs, should not be exposed when creating connector
private String workerName;
private String assignedContainer;
private String assignedLeaseContainer;

public CosmosDBSourceConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand All @@ -98,6 +100,7 @@ public CosmosDBSourceConfig(Map<String, String> parsedConfig) {

// Since variables are not defined as Connect configs, grab values directly from Map
assignedContainer = parsedConfig.get(COSMOS_ASSIGNED_CONTAINER_CONF);
assignedLeaseContainer = parsedConfig.get(COSMOS_ASSIGNED_LEASE_CONTAINER_CONF);
workerName = parsedConfig.get(COSMOS_WORKER_NAME_CONF);
}

Expand Down Expand Up @@ -242,6 +245,10 @@ public String getAssignedContainer() {
return this.assignedContainer;
}

public String getAssignedLeaseContainer() {
return assignedLeaseContainer;
}

public String getWorkerName() {
return this.workerName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

import java.util.function.Function;
import java.util.stream.Collectors;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
import com.azure.cosmos.kafka.connect.implementations.CosmosClientStore;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.ThroughputProperties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -28,12 +39,20 @@ public class CosmosDBSourceConnector extends SourceConnector {

private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceConnector.class);
private CosmosDBSourceConfig config = null;
private CosmosAsyncClient cosmosClient = null;

@Override
public void start(Map<String, String> props) {
logger.info("Starting the Source Connector");
try {
config = new CosmosDBSourceConfig(props);
this.cosmosClient = CosmosClientStore.getCosmosClient(this.config, this.getUserAgentSuffix());

List<String> containerList = config.getTopicContainerMap().getContainerList();
for (String containerId : containerList) {
createLeaseContainerIfNotExists(cosmosClient, this.config.getDatabaseName(), this.getAssignedLeaseContainer(containerId));
}

} catch (ConfigException e) {
throw new ConnectException(
"Couldn't start CosmosDBSourceConnector due to configuration error", e);
Expand All @@ -59,8 +78,10 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
for (int i = 0; i < maxTasks; i++) {
// Equally distribute workers by assigning workers to containers in round-robin fashion.
Map<String, String> taskProps = config.originalsStrings();
taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_CONTAINER_CONF,
containerList.get(i % containerList.size()));
String assignedContainer = containerList.get(i % containerList.size());

taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_CONTAINER_CONF, assignedContainer);
taskProps.put(CosmosDBSourceConfig.COSMOS_ASSIGNED_LEASE_CONTAINER_CONF, this.getAssignedLeaseContainer(assignedContainer));
taskProps.put(CosmosDBSourceConfig.COSMOS_WORKER_NAME_CONF,
String.format("%s-%d-%d",
CosmosDBSourceConfig.COSMOS_WORKER_NAME_DEFAULT,
Expand All @@ -74,6 +95,9 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
@Override
public void stop() {
logger.info("Stopping CosmosDB Source Connector");
if (this.cosmosClient != null) {
this.cosmosClient.close();
}
}

@Override
Expand Down Expand Up @@ -101,4 +125,46 @@ public Config validate(Map<String, String> connectorConfigs) {

return config;
}

private String getAssignedLeaseContainer(String containerName) {
return containerName + "-leases";
}

private String getUserAgentSuffix() {
return CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version();
}

private CosmosAsyncContainer createLeaseContainerIfNotExists(CosmosAsyncClient client, String databaseName, String leaseCollectionName) {
CosmosAsyncDatabase database = client.getDatabase(databaseName);
CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName);
CosmosContainerResponse leaseContainerResponse = null;

logger.info("Checking whether the lease container exists.");
try {
leaseContainerResponse = leaseCollection.read().block();
} catch (CosmosException ex) {
// Swallowing exceptions when the type is CosmosException and statusCode is 404
if (ex.getStatusCode() != 404) {
throw ex;
}
logger.info("Lease container does not exist {}", ex.getMessage());
}

if (leaseContainerResponse == null) {
logger.info("Creating the Lease container : {}", leaseCollectionName);
CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();

try {
database.createContainer(containerSettings, throughputProperties, requestOptions).block();
} catch (Exception e) {
logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName);
throw e;
}
logger.info("Successfully created new lease container.");
}

return database.getContainer(leaseCollectionName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,22 @@

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorBuilder;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.kafka.connect.CosmosDBConfig;
import com.azure.cosmos.kafka.connect.TopicContainerMap;
import com.azure.cosmos.kafka.connect.implementations.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementations.CosmosKafkaSchedulers;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -69,13 +64,13 @@ public void start(Map<String, String> map) {
this.queue = new LinkedTransferQueue<>();

logger.info("Worker {} Creating the client.", this.config.getWorkerName());
client = getCosmosClient(config);
client = CosmosClientStore.getCosmosClient(this.config, this.getUserAgentSuffix());

// Initialize the database, feed and lease containers
CosmosAsyncDatabase database = client.getDatabase(config.getDatabaseName());
String container = config.getAssignedContainer();
CosmosAsyncContainer feedContainer = database.getContainer(container);
leaseContainer = createNewLeaseContainer(client, config.getDatabaseName(), container + "-leases");
leaseContainer = database.getContainer(this.config.getAssignedLeaseContainer());

// Create source partition map
partitionMap = new HashMap<>();
Expand Down Expand Up @@ -212,34 +207,28 @@ public void stop() {
// NOTE: poll() method and stop() method are both called from the same thread,
// so it is important not to include any changes which may block both places forever
running.set(false);

// Release all the resources.
if (changeFeedProcessor != null) {
changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}

if (this.client != null) {
this.client.close();
}
Mono.just(this)
.flatMap(connectorTask -> {
if (this.changeFeedProcessor != null) {
return this.changeFeedProcessor.stop()
.delayElement(Duration.ofMillis(500)) // delay some time here as the partitionProcessor will release the lease in background
.doOnNext(t -> {
this.changeFeedProcessor = null;
this.safeCloseClient();
});
} else {
this.safeCloseClient();
return Mono.empty();
}
})
.block();
}

private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName());

CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
.key(config.getConnKey())
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled())
.userAgentSuffix(getUserAgentSuffix());

if (config.isGatewayModeEnabled()) {
cosmosClientBuilder.gatewayMode();
private void safeCloseClient() {
if (this.client != null) {
this.client.close();
}

return cosmosClientBuilder.buildAsyncClient();
}

private String getUserAgentSuffix() {
Expand Down Expand Up @@ -292,6 +281,7 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
// The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
// until batch size or maxWaitTime reached. Which can cause CFP to checkpoint faster than kafka batch.
// In order to not move CFP checkpoint faster, we are using shouldFillMoreRecords to control the batch flush.
logger.debug("Transferring document " + this.config.getWorkerName());
this.queue.transfer(document);
} catch (InterruptedException e) {
logger.error("Interrupted! changeFeedReader.", e);
Expand All @@ -307,38 +297,4 @@ protected void handleCosmosDbChanges(List<JsonNode> docs) {
this.shouldFillMoreRecords.set(false);
}
}

private CosmosAsyncContainer createNewLeaseContainer(CosmosAsyncClient client, String databaseName, String leaseCollectionName) {
CosmosAsyncDatabase database = client.getDatabase(databaseName);
CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName);
CosmosContainerResponse leaseContainerResponse = null;

logger.info("Checking whether the lease container exists.");
try {
leaseContainerResponse = leaseCollection.read().block();
} catch (CosmosException ex) {
// Swallowing exceptions when the type is CosmosException and statusCode is 404
if (ex.getStatusCode() != 404) {
throw ex;
}
logger.info("Lease container does not exist {}", ex.getMessage());
}

if (leaseContainerResponse == null) {
logger.info("Creating the Lease container : {}", leaseCollectionName);
CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();

try {
database.createContainer(containerSettings, throughputProperties, requestOptions).block();
} catch (Exception e) {
logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName);
throw e;
}
logger.info("Successfully created new lease container.");
}

return database.getContainer(leaseCollectionName);
}
}
Loading
Loading