Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Mar 29, 2024
1 parent 41d49eb commit 4b486fd
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
/***
* The CosmosDb source connector.
*/
public class CosmosSourceConnector extends SourceConnector {
public class CosmosSourceConnector extends SourceConnector implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosSourceConnector.class);
private CosmosSourceConfig config;
private CosmosAsyncClient cosmosClient;
Expand Down Expand Up @@ -347,4 +347,9 @@ private Map<String, String> getContainersTopicMap(List<CosmosContainerProperties

return effectiveContainersTopicMap;
}

@Override
public void close() {
this.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
/**
* Generic CosmosDb sink write exceptions.
*/
public class CosmosDBWriteException extends ConnectException {
public class CosmosWriteException extends ConnectException {
/**
*
*/
private static final long serialVersionUID = 1L;

public CosmosDBWriteException(String message) {
public CosmosWriteException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void write(CosmosAsyncContainer container, List<SinkRecord> sinkRecords)
writeCore(container, sinkOperations);
} catch (Exception e) {
LOGGER.error("Write failed. ", e);
throw new CosmosDBWriteException(e.getMessage());
throw new CosmosWriteException(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class CosmosDbSinkConnectorITest extends KafkaCosmosIntegrationTestSuiteBase {
private static final Logger logger = LoggerFactory.getLogger(CosmosDbSinkConnectorITest.class);
public class CosmosSinkConnectorITest extends KafkaCosmosIntegrationTestSuiteBase {
private static final Logger logger = LoggerFactory.getLogger(CosmosSinkConnectorITest.class);

// TODO[public preview]: add more integration tests
@Test(groups = { "kafka-integration"}, timeOut = TIMEOUT)
Expand All @@ -38,7 +38,7 @@ public void sinkToSingleContainer() throws InterruptedException {
// TODO[Public Preview]: add tests for with schema
sinkConnectorConfig.put("value.converter.schemas.enable", "false");
sinkConnectorConfig.put("key.converter", StringConverter.class.getName());
sinkConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosDBSinkConnector");
sinkConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSinkConnector");
sinkConnectorConfig.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
sinkConnectorConfig.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sinkConnectorConfig.put("kafka.connect.cosmos.applicationName", "Test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.CosmosDBSinkConnectorTest.SinkConfigs.ALL_VALID_CONFIGS;
import static com.azure.cosmos.kafka.connect.CosmosSinkConnectorTest.SinkConfigs.ALL_VALID_CONFIGS;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.testng.Assert.assertEquals;

public class CosmosDBSinkConnectorTest extends KafkaCosmosTestSuiteBase {
public class CosmosSinkConnectorTest extends KafkaCosmosTestSuiteBase {
@Test(groups = "unit")
public void taskClass() {
CosmosSinkConnector sinkConnector = new CosmosSinkConnector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class CosmosDbSourceConnectorITest extends KafkaCosmosIntegrationTestSuiteBase {
private static final Logger logger = LoggerFactory.getLogger(CosmosDbSourceConnectorITest.class);
public class CosmosSourceConnectorITest extends KafkaCosmosIntegrationTestSuiteBase {
private static final Logger logger = LoggerFactory.getLogger(CosmosSourceConnectorITest.class);

// TODO[public preview]: add more integration tests
@Test(groups = { "kafka-integration"}, timeOut = TIMEOUT)
public void readFromSingleContainer() {
Map<String, String> sourceConnectorConfig = new HashMap<>();
sourceConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosDBSourceConnector");
sourceConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSourceConnector");
sourceConnectorConfig.put("kafka.connect.cosmos.accountEndpoint", KafkaCosmosTestConfigurations.HOST);
sourceConnectorConfig.put("kafka.connect.cosmos.accountKey", KafkaCosmosTestConfigurations.MASTER_KEY);
sourceConnectorConfig.put("kafka.connect.cosmos.applicationName", "Test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static com.azure.cosmos.kafka.connect.CosmosDBSourceConnectorTest.SourceConfigs.ALL_VALID_CONFIGS;
import static com.azure.cosmos.kafka.connect.CosmosSourceConnectorTest.SourceConfigs.ALL_VALID_CONFIGS;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.testng.Assert.assertEquals;

@Test
public class CosmosDBSourceConnectorTest extends KafkaCosmosTestSuiteBase {
public class CosmosSourceConnectorTest extends KafkaCosmosTestSuiteBase {
@Test(groups = "unit")
public void taskClass() {
CosmosSourceConnector sourceConnector = new CosmosSourceConnector();
Expand Down

0 comments on commit 4b486fd

Please sign in to comment.