Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
annie-mac committed Mar 29, 2024
1 parent 4b486fd commit 8f819f9
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ the main ServiceBusClientBuilder. -->
<suppress checks="ConstantName" files="com.azure.spring.cloud.config.AppConfigurationPropertySourceLocator"/>
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="[/\\]azure-cosmos-kafka-connect[/\\]"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSourceConnector"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosDBSinkConnector"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosSourceConnector"/>
<suppress checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" files="com.azure.cosmos.kafka.connect.CosmosSinkConnector"/>

<!-- Checkstyle suppressions for resource manager package -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.resourcemanager.*"/>
Expand Down
12 changes: 6 additions & 6 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ Licensed under the MIT License.
<revapi.skip>true</revapi.skip>
<!-- Configures the Java 9+ run to perform the required module exports, opens, and reads that are necessary for testing but shouldn't be part of the module-info. -->
<javaModulesSurefireArgLine>
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.sink.idStrategy=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.routing=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.caches=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.faultinjection=ALL-UNNAMED
--add-opens com.azure.cosmos/com.azure.cosmos.implementation.apachecommons.lang=ALL-UNNAMED
--add-opens com.azure.cosmos.kafka.connect/com.azure.cosmos.kafka.connect.implementation.source=com.fasterxml.jackson.databind,ALL-UNNAMED

</javaModulesSurefireArgLine>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {

@Override
public void stop() {
LOGGER.debug("Kafka Cosmos sink connector {} is stopped.");
LOGGER.info("Kafka Cosmos sink connector {} is stopped.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import reactor.core.scheduler.Schedulers;

public class KafkaCosmosSchedulers {
private static final String SINK_BOUNDED_ELASTIC_THREAD_NAME = "sink-bounded-elastic";
private static final String SINK_BOUNDED_ELASTIC_THREAD_NAME = "kafka-cosmos-sink-bounded-elastic";
private static final int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS
public static final Scheduler SINK_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public String version() {

@Override
public void start(Map<String, String> props) {
LOGGER.info("Starting the kafka cosmos sink task...");
LOGGER.info("Starting the kafka cosmos sink task");
this.sinkTaskConfig = new CosmosSinkTaskConfig(props);
this.cosmosClient = CosmosClientStore.getCosmosClient(this.sinkTaskConfig.getAccountConfig());
this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig);
Expand Down Expand Up @@ -85,11 +85,9 @@ record -> this.sinkTaskConfig

@Override
public void stop() {
LOGGER.info("Stopping Kafka CosmosDB sink task...");
LOGGER.info("Stopping Kafka CosmosDB sink task");
if (this.cosmosClient != null) {
this.cosmosClient.close();
}

this.cosmosClient = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
* Generic CosmosDb sink write exceptions.
*/
public class CosmosWriteException extends ConnectException {
/**
*
*/
private static final long serialVersionUID = 1L;

public CosmosWriteException(String message) {
Expand Down

0 comments on commit 8f819f9

Please sign in to comment.