From 12c14fa2cc632397e5c690c8bc74bbd9b2a0146d Mon Sep 17 00:00:00 2001 From: annie-mac Date: Fri, 29 Mar 2024 09:15:19 -0700 Subject: [PATCH] refactor --- .../sink/KafkaCosmosPointWriter.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java index 62a731ce81565..cc5013e954b2f 100644 --- a/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java +++ b/sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/KafkaCosmosPointWriter.java @@ -70,8 +70,7 @@ public void writeCore(CosmosAsyncContainer container, List sinkOp private void upsertWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation) { executeWithRetry( (operation) -> { - CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions(); - CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); + CosmosItemRequestOptions cosmosItemRequestOptions = this.getCosmosItemRequestOptions(); return container.upsertItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then(); }, (throwable) -> false, // no exceptions should be ignored @@ -82,8 +81,7 @@ private void upsertWithRetry(CosmosAsyncContainer container, SinkOperation sinkO private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation) { executeWithRetry( (operation) -> { - CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions(); - CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(cosmosItemRequestOptions, this.throughputControlConfig); + CosmosItemRequestOptions cosmosItemRequestOptions = this.getCosmosItemRequestOptions(); return container.createItem(operation.getSinkRecord().value(), cosmosItemRequestOptions).then(); }, (throwable) -> KafkaCosmosExceptionsHelper.isResourceExistsException(throwable), @@ -94,9 +92,8 @@ private void createWithRetry(CosmosAsyncContainer container, SinkOperation sinkO private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation, String etag) { executeWithRetry( (operation) -> { - CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions(); + CosmosItemRequestOptions itemRequestOptions = this.getCosmosItemRequestOptions(); itemRequestOptions.setIfMatchETag(etag); - CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); return ImplementationBridgeHelpers .CosmosAsyncContainerHelper @@ -121,7 +118,7 @@ private void replaceIfNotModifiedWithRetry(CosmosAsyncContainer container, SinkO private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkOperation, boolean onlyIfModified) { executeWithRetry( (operation) -> { - CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions(); + CosmosItemRequestOptions itemRequestOptions = this.getCosmosItemRequestOptions(); if (onlyIfModified) { String etag = this.getEtag(operation.getSinkRecord().value()); if (StringUtils.isNotEmpty(etag)) { @@ -129,8 +126,6 @@ private void deleteWithRetry(CosmosAsyncContainer container, SinkOperation sinkO } } - CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); - return ImplementationBridgeHelpers .CosmosAsyncContainerHelper .getCosmosAsyncContainerAccessor() @@ -198,5 +193,11 @@ private void executeWithRetry( .subscribeOn(KafkaCosmosSchedulers.SINK_BOUNDED_ELASTIC) .block(); } + + private CosmosItemRequestOptions getCosmosItemRequestOptions() { + CosmosItemRequestOptions itemRequestOptions = new CosmosItemRequestOptions(); + CosmosThroughputControlHelper.tryPopulateThroughputControlGroupName(itemRequestOptions, this.throughputControlConfig); + return itemRequestOptions; + } }