From 1526007a99f28a7ba922046224df0a777a83978c Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 14 Jan 2025 16:02:39 -0800 Subject: [PATCH 01/11] Change feed missing changes --- .../IncrementalChangeFeedProcessorTest.java | 81 ++++++++++++++ .../IncrementalChangeFeedProcessorTest.java | 105 ++++++++++++++++++ .../epkversion/PartitionProcessorImpl.java | 3 +- 3 files changed, 188 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index c9693cc619315..a395eea7ebfa0 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -19,6 +19,7 @@ import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.TestConfigurations; @@ -44,6 +45,7 @@ import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; import com.azure.cosmos.test.faultinjection.FaultInjectionRule; @@ -97,6 +99,8 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase { private final static Logger logger = LoggerFactory.getLogger(IncrementalChangeFeedProcessorTest.class); private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper(); + private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor clientAccessor = + ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); private CosmosAsyncDatabase createdDatabase; private final String hostName = RandomStringUtils.randomAlphabetic(6); @@ -200,6 +204,83 @@ public void readFeedDocumentsStartFromBeginning() throws InterruptedException { } } + @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) + public void exhaustedRUs() throws InterruptedException { + CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + + try { + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 100); + +// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders +// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) +// .build(); +// +// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); +// +// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() +// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) +// .operationType(FaultInjectionOperationType.REPLACE_ITEM) +// .build(); +// +// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder +// .condition(faultInjectionConditionForRegion) +// .result(serverErrorResult) +// .duration(Duration.ofSeconds(13)) +// .startDelay(Duration.ofSeconds(2)) +// .build(); +// +// CosmosFaultInjectionHelper +// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) +// .block(); + + changeFeedProcessor = new ChangeFeedProcessorBuilder() + .hostName(hostName) + .handleLatestVersionChanges(changeFeedProcessorHandler(receivedDocuments)) + .feedContainer(createdFeedCollection) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later + .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later + .setFeedPollDelay(Duration.ofSeconds(2)) + .setLeasePrefix("TEST") + .setMaxItemCount(10) + .setStartFromBeginning(true) + ) + .buildChangeFeedProcessor(); + + try { + changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout + .subscribe(); + } catch (Exception ex) { + logger.error("Change feed processor did not start in the expected time", ex); + throw ex; + } + + // Wait for the feed processor to receive and process the documents. + Thread.sleep(6 * CHANGE_FEED_PROCESSOR_TIMEOUT); + + assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout + + for (InternalObjectNode item : createdDocuments) { + assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); + } + + // Wait for the feed processor to shutdown. + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + } finally { + safeDeleteCollection(createdFeedCollection); + safeDeleteCollection(createdLeaseCollection); + + // Allow some time for the collections to be deleted before exiting. + Thread.sleep(500); + } + } + @Test(groups = { "query" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 0f0ab04840e1d..0363748fb427e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -4,6 +4,7 @@ import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.ChangeFeedProcessorBuilder; +import com.azure.cosmos.ConnectionMode; import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; @@ -16,6 +17,7 @@ import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.Utils; @@ -34,6 +36,16 @@ import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.rx.TestSuiteBase; +import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; +import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; +import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResult; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -75,6 +87,8 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase { private final static Logger log = LoggerFactory.getLogger(IncrementalChangeFeedProcessorTest.class); private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper(); + private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor clientAccessor = + ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); private CosmosAsyncDatabase createdDatabase; // private final String databaseId = "testdb1"; @@ -191,6 +205,97 @@ public void readFeedDocumentsStartFromBeginning() throws InterruptedException { } } + @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) + public void exhaustedRUs() throws InterruptedException { + CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + + try { + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 300); + +// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders +// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) +// .build(); +// +// FaultInjectionServerErrorResult serverErrorResult2 = FaultInjectionResultBuilders +// .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) +// .delay(Duration.ofSeconds(6)) +// .build(); +// +// +// +// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); +// +// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() +// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) +// .operationType(FaultInjectionOperationType.REPLACE_ITEM) +// .build(); +// +// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder +// .condition(faultInjectionConditionForRegion) +// .result(serverErrorResult) +// .startDelay(Duration.ofSeconds(2)) +// .build(); +// +// FaultInjectionRule faultInjectionRule2 = faultInjectionRuleBuilder +// .condition(faultInjectionConditionForRegion) +// .result(serverErrorResult2) +// .duration(Duration.ofSeconds(3)) +// .startDelay(Duration.ofSeconds(2)) +// .build(); +// +// CosmosFaultInjectionHelper +// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule, faultInjectionRule2)) +// .block(); + + changeFeedProcessor = new ChangeFeedProcessorBuilder() + .hostName(hostName) + .handleChanges(changeFeedProcessorHandler(receivedDocuments)) + .feedContainer(createdFeedCollection) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later + .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later + .setFeedPollDelay(Duration.ofSeconds(2)) + .setLeasePrefix("TEST") + .setMaxItemCount(10) + .setStartFromBeginning(true) + ) + .buildChangeFeedProcessor(); + + try { + changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout + .subscribe(); + } catch (Exception ex) { + logger.error("Change feed processor did not start in the expected time", ex); + throw ex; + } + + // Wait for the feed processor to receive and process the documents. + Thread.sleep(6 * CHANGE_FEED_PROCESSOR_TIMEOUT); + + assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout + + for (InternalObjectNode item : createdDocuments) { + assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); + } + + // Wait for the feed processor to shutdown. + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + } finally { + safeDeleteCollection(createdFeedCollection); + safeDeleteCollection(createdLeaseCollection); + + // Allow some time for the collections to be deleted before exiting. + Thread.sleep(500); + } + } + + @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index 91c8dcb2d1466..f13a355ab10e9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -120,7 +120,7 @@ public Mono run(CancellationToken cancellationToken) { this.settings.getCollectionSelfLink(), this.options, itemType) - .limitRequest(1); + .take(1, false); }) .flatMap(documentFeedResponse -> { if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); @@ -229,6 +229,7 @@ public Mono run(CancellationToken cancellationToken) { case TRANSIENT_ERROR: { // Retry on transient (429) errors if (clientException.getRetryAfterDuration().toMillis() > 0) { + System.out.println("Retrying 429"); Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), MILLIS); return Mono.just(clientException.getRetryAfterDuration().toMillis()) // set some seed value to be able to run // the repeat loop From 030a840ce4924a17f82c692d151edceda5cb09c2 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 16 Jan 2025 13:08:11 -0800 Subject: [PATCH 02/11] Debugging logs and cloning solution --- .../IncrementalChangeFeedProcessorTest.java | 6 +-- .../IncrementalChangeFeedProcessorTest.java | 46 ++++++++----------- .../com/azure/cosmos/CosmosException.java | 2 +- .../CosmosChangeFeedRequestOptionsImpl.java | 12 ++++- .../common/ChangeFeedContextClientImpl.java | 22 ++++++--- .../changefeed/common/ChangeFeedStateV1.java | 20 ++++++++ .../epkversion/PartitionProcessorImpl.java | 11 +++-- .../pkversion/PartitionProcessorImpl.java | 10 +++- .../query/ChangeFeedFetcher.java | 8 ++++ .../cosmos/implementation/query/Fetcher.java | 5 +- .../implementation/query/Paginator.java | 2 + 11 files changed, 96 insertions(+), 48 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index a395eea7ebfa0..59588f4fa2871 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -228,8 +228,8 @@ public void exhaustedRUs() throws InterruptedException { // FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder // .condition(faultInjectionConditionForRegion) // .result(serverErrorResult) -// .duration(Duration.ofSeconds(13)) -// .startDelay(Duration.ofSeconds(2)) +// .duration(Duration.ofSeconds(10)) +// .startDelay(Duration.ofSeconds(3)) // .build(); // // CosmosFaultInjectionHelper @@ -260,7 +260,7 @@ public void exhaustedRUs() throws InterruptedException { } // Wait for the feed processor to receive and process the documents. - Thread.sleep(6 * CHANGE_FEED_PROCESSOR_TIMEOUT); + Thread.sleep(8 * CHANGE_FEED_PROCESSOR_TIMEOUT); assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 0363748fb427e..3acce47095c13 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -76,6 +76,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -213,19 +214,11 @@ public void exhaustedRUs() throws InterruptedException { try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 300); // FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders // .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) // .build(); // -// FaultInjectionServerErrorResult serverErrorResult2 = FaultInjectionResultBuilders -// .getResultBuilder(FaultInjectionServerErrorType.RESPONSE_DELAY) -// .delay(Duration.ofSeconds(6)) -// .build(); -// -// -// // FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); // // FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() @@ -236,20 +229,18 @@ public void exhaustedRUs() throws InterruptedException { // FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder // .condition(faultInjectionConditionForRegion) // .result(serverErrorResult) -// .startDelay(Duration.ofSeconds(2)) -// .build(); -// -// FaultInjectionRule faultInjectionRule2 = faultInjectionRuleBuilder -// .condition(faultInjectionConditionForRegion) -// .result(serverErrorResult2) -// .duration(Duration.ofSeconds(3)) -// .startDelay(Duration.ofSeconds(2)) +// .duration(Duration.ofSeconds(10)) +// .startDelay(Duration.ofSeconds(3)) // .build(); // // CosmosFaultInjectionHelper -// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule, faultInjectionRule2)) +// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) // .block(); + CompletableFuture setupDocumentsFuture = CompletableFuture.runAsync(() -> { + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 200); + }); + changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) .handleChanges(changeFeedProcessorHandler(receivedDocuments)) @@ -265,20 +256,23 @@ public void exhaustedRUs() throws InterruptedException { ) .buildChangeFeedProcessor(); - try { - changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout - .subscribe(); - } catch (Exception ex) { - logger.error("Change feed processor did not start in the expected time", ex); - throw ex; - } + CompletableFuture startProcessorFuture = CompletableFuture.runAsync(() -> { + try { + changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()).subscribe(); + } catch (Exception ex) { + logger.error("Change feed processor did not start in the expected time", ex); + throw new RuntimeException(ex); + } + }); + + CompletableFuture.allOf(setupDocumentsFuture, startProcessorFuture).join(); // Wait for the feed processor to receive and process the documents. - Thread.sleep(6 * CHANGE_FEED_PROCESSOR_TIMEOUT); + Thread.sleep(10 * CHANGE_FEED_PROCESSOR_TIMEOUT); assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); - changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout + changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); for (InternalObjectNode item : createdDocuments) { assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java index 0cf4870cf485b..6bd340fc20bb8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java @@ -205,7 +205,7 @@ protected CosmosException(int statusCode, Exception innerException) { * @param cosmosErrorResource the error resource object. * @param responseHeaders the response headers. */ - protected CosmosException(int statusCode, CosmosError cosmosErrorResource, Map responseHeaders) { + public CosmosException(int statusCode, CosmosError cosmosErrorResource, Map responseHeaders) { this(/* resourceAddress */ null, statusCode, cosmosErrorResource, responseHeaders); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index 4bbdf764cce38..c7346570ec275 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -11,6 +11,7 @@ import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple; import com.azure.cosmos.models.CosmosRequestOptions; @@ -18,6 +19,8 @@ import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.util.Beta; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -50,9 +53,11 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ private Set keywordIdentifiers; private boolean completeAfterAllCurrentChangesRetrieved; private Long endLSN; + private static final Logger logger = LoggerFactory.getLogger(CosmosChangeFeedRequestOptionsImpl.class); public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { - this.continuationState = toBeCloned.continuationState; +// this.continuationState = toBeCloned.continuationState; + this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) toBeCloned.continuationState); this.feedRangeInternal = toBeCloned.feedRangeInternal; this.properties = toBeCloned.properties; this.maxItemCount = toBeCloned.maxItemCount; @@ -72,6 +77,8 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB this.keywordIdentifiers = toBeCloned.keywordIdentifiers; this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved; this.endLSN = toBeCloned.endLSN; + + logger.warn("Cloning options with continuation state {}", continuationState.getContinuation()); } public CosmosChangeFeedRequestOptionsImpl( @@ -93,7 +100,7 @@ public CosmosChangeFeedRequestOptionsImpl( this.maxPrefetchPageCount = DEFAULT_MAX_PREFETCH_PAGE_COUNT; this.feedRangeInternal = feedRange; this.startFromInternal = startFromInternal; - this.continuationState = continuationState; + this.continuationState = continuationState; // this should be a clone if (mode != ChangeFeedMode.INCREMENTAL && mode != ChangeFeedMode.FULL_FIDELITY) { throw new IllegalArgumentException( @@ -110,6 +117,7 @@ public CosmosChangeFeedRequestOptionsImpl( this.properties = new HashMap<>(); this.isSplitHandlingDisabled = false; this.completeAfterAllCurrentChangesRetrieved = DEFAULT_COMPLETE_AFTER_ALL_CURRENT_CHANGES_RETRIEVED; + logger.warn("Creating options with continuation state {}", continuationState.getContinuation()); } public ChangeFeedState getContinuation() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java index 9b3c58966495d..24e8b808315b7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java @@ -7,12 +7,8 @@ import com.azure.cosmos.CosmosAsyncDatabase; import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.implementation.AsyncDocumentClient; -import com.azure.cosmos.implementation.ChangeFeedOperationState; -import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; -import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionKeyRange; -import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.models.CosmosBulkOperationResponse; @@ -41,11 +37,14 @@ import java.net.URI; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import static com.azure.cosmos.CosmosBridgeInternal.getContextClient; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; +import static java.lang.Thread.sleep; /** * Implementation for ChangeFeedDocumentClient. @@ -58,6 +57,7 @@ public class ChangeFeedContextClientImpl implements ChangeFeedContextClient { private Scheduler scheduler; private static final ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.CosmosAsyncDatabaseAccessor cosmosAsyncDatabaseAccessor = ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor(); + private AtomicInteger count = new AtomicInteger(0); /** * Initializes a new instance of the {@link ChangeFeedContextClient} interface. @@ -200,8 +200,16 @@ public Flux> deleteAllItems(List Mono> replaceItem(String itemId, PartitionKey partitionKey, T document, CosmosItemRequestOptions options) { - return cosmosContainer.replaceItem(document, itemId, partitionKey, options) - .publishOn(this.scheduler); + if (count.compareAndSet(1, 2)) { + Map map = new HashMap<>(); + map.put("x-ms-retry-after-ms", "1000"); + throw new com.azure.cosmos.CosmosException(429, new com.azure.cosmos.implementation.CosmosError(), map); + + } else { + count.incrementAndGet(); + return cosmosContainer.replaceItem(document, itemId, partitionKey, options) + .publishOn(this.scheduler); + } } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java index 4311f1631462f..33363029ec570 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java @@ -10,12 +10,17 @@ import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.query.CompositeContinuationToken; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; public class ChangeFeedStateV1 extends ChangeFeedState { + private static final Logger logger = LoggerFactory.getLogger(ChangeFeedStateV1.class); private final String containerRid; private final FeedRangeInternal feedRange; private final ChangeFeedMode mode; @@ -38,6 +43,20 @@ public ChangeFeedStateV1( this.mode = mode; } + public ChangeFeedStateV1(ChangeFeedStateV1 toBeCloned) { + this.containerRid = toBeCloned.containerRid; + this.feedRange = toBeCloned.feedRange; + this.startFromSettings = toBeCloned.startFromSettings; + if (toBeCloned.continuation != null) { + List compositeContinuationTokens = new ArrayList<>(); + compositeContinuationTokens.addAll(toBeCloned.continuation.getCompositeContinuationTokens()); + this.continuation = FeedRangeContinuation.create(toBeCloned.continuation.getContainerRid(), toBeCloned.continuation.getFeedRange(), compositeContinuationTokens); + } else { + this.continuation = null; + } + this.mode = toBeCloned.mode; + } + @Override public FeedRangeContinuation getContinuation() { return this.continuation; @@ -47,6 +66,7 @@ public FeedRangeContinuation getContinuation() { public ChangeFeedState setContinuation(FeedRangeContinuation continuation) { checkNotNull(continuation, "Argument 'continuation' must not be null."); continuation.validateContainer(this.containerRid); + logger.warn("Setting continuation: {}", this.continuation); this.continuation = continuation; return this; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index f13a355ab10e9..47dcc9791965a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -120,9 +120,14 @@ public Mono run(CancellationToken cancellationToken) { this.settings.getCollectionSelfLink(), this.options, itemType) - .take(1, false); - }) + .take(1, true).log("limitRequest"); + }, 1, 1) .flatMap(documentFeedResponse -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); final String continuationToken = documentFeedResponse.getContinuationToken(); @@ -229,7 +234,7 @@ public Mono run(CancellationToken cancellationToken) { case TRANSIENT_ERROR: { // Retry on transient (429) errors if (clientException.getRetryAfterDuration().toMillis() > 0) { - System.out.println("Retrying 429"); + logger.warn("Retrying transient error after {}", clientException.getRetryAfterDuration()); Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), MILLIS); return Mono.just(clientException.getRetryAfterDuration().toMillis()) // set some seed value to be able to run // the repeat loop diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java index 6cd323479ba3b..b171f2a07c183 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java @@ -121,9 +121,14 @@ public Mono run(CancellationToken cancellationToken) { return this.documentClient.createDocumentChangeFeedQuery( this.settings.getCollectionSelfLink(), this.options, - JsonNode.class).limitRequest(1); - }) + JsonNode.class).take(1, true).log("limitRequest"); + }, 1, 1) .flatMap(documentFeedResponse -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); final String continuationToken = documentFeedResponse.getContinuationToken(); @@ -227,6 +232,7 @@ public Mono run(CancellationToken cancellationToken) { case TRANSIENT_ERROR: { // Retry on transient (429) errors if (clientException.getRetryAfterDuration().toMillis() > 0) { + logger.warn("Retrying Transient Error."); Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), MILLIS); return Mono.just(clientException.getRetryAfterDuration().toMillis()) // set some seed value to be able to run // the repeat loop diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java index 0ab2f47917029..c0fbf34b2818a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java @@ -39,10 +39,12 @@ class ChangeFeedFetcher extends Fetcher { private final static ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor = ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor(); + private static final Logger logger = LoggerFactory.getLogger(ChangeFeedFetcher.class); private final ChangeFeedState changeFeedState; private final Supplier createRequestFunc; private final Supplier feedRangeContinuationRetryPolicySupplier; private final boolean completeAfterAllCurrentChangesRetrieved; + private int count = 0; private final Long endLSN; public ChangeFeedFetcher( @@ -115,6 +117,7 @@ private Mono> nextPageInternal(DocumentClientRetryPolicy retryPo return Mono.fromSupplier(() -> nextPageCore(retryPolicy)) .flatMap(Function.identity()) .flatMap((r) -> { + logger.warn("ChangeFeedFetcher: received {} results", r.getResults().size()); FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation(); @@ -171,6 +174,11 @@ protected boolean isFullyDrained(boolean isChangeFeed, FeedResponse response) } FeedRangeContinuation continuation = this.changeFeedState.getContinuation(); + logger.warn("ChangeFeedFetcher: isFullyDrained: {}, --- {}", continuation.isDone(), this.changeFeedState.getContinuation()); + if (count == 1) { + Thread.dumpStack(); + } + count++; return continuation != null && continuation.isDone(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java index dad3ff190abba..2a7c9287061e9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java @@ -131,17 +131,14 @@ private void updateState(FeedResponse response, RxDocumentServiceRequest requ (top.get() != 0) && // if fullyDrained then done !this.isFullyDrained(this.isChangeFeed, response)) { - shouldFetchMore.set(true); } else { shouldFetchMore.set(false); } - if (logger.isDebugEnabled()) { - logger.debug("Fetcher state updated: " + + logger.warn("Fetcher state updated: " + "isChangeFeed = {}, continuation token = {}, max item count = {}, should fetch more = {}, Context: {}", isChangeFeed, this.getContinuationForLogging(), maxItemCount.get(), shouldFetchMore.get(), this.operationContextTextProvider.get()); - } } protected void reEnableShouldFetchMoreForRetry() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java index b41f6cfb1cdfb..6889ae4a0ae4c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java @@ -101,6 +101,8 @@ public static Flux> getChangeFeedQueryResultAsObservable( Long endLsn, OperationContextAndListenerTuple operationContext) { + logger.warn("Change feed fetcher execution with prefetch count: {}", preFetchCount); + return getPaginatedQueryResultAsObservable( () -> new ChangeFeedFetcher<>( client, From a6b408488f650eeeb183730ce6faf82c2ca6e81e Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 16 Jan 2025 14:38:29 -0800 Subject: [PATCH 03/11] Fixed prefetching issue --- .../changefeed/common/ChangeFeedContextClientImpl.java | 2 +- .../changefeed/epkversion/ChangeFeedProcessorImplBase.java | 1 - .../changefeed/epkversion/PartitionProcessorImpl.java | 7 +++++-- .../changefeed/pkversion/PartitionProcessorImpl.java | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java index 24e8b808315b7..40130196cb789 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java @@ -151,7 +151,7 @@ public Flux> createDocumentChangeFeedQuery(CosmosAsyncConta } return collectionLink .queryChangeFeed(changeFeedRequestOptions, klass) - .byPage() + .byPage().take(1, true) .publishOn(this.scheduler); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java index bef9beccd058c..e2e543d2df17b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java @@ -232,7 +232,6 @@ public Mono> getCurrentState() { return this.feedContextClient .createDocumentChangeFeedQuery(this.feedContextClient.getContainerClient(), options, ChangeFeedProcessorItem.class, false) - .take(1) .map(feedResponse -> { ChangeFeedProcessorState changeFeedProcessorState = new ChangeFeedProcessorState() .setHostName(lease.getOwner()) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index 47dcc9791965a..8457d76898451 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -5,6 +5,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.ThroughputControlGroupConfig; import com.azure.cosmos.implementation.CosmosSchedulers; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; @@ -33,6 +34,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Optional; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -57,6 +59,8 @@ class PartitionProcessorImpl implements PartitionProcessor { private volatile String lastServerContinuationToken; private volatile boolean hasMoreResults; private final FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager; + private static final ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.CosmosChangeFeedRequestOptionsAccessor optionsAccessor = + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor(); public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, @@ -119,8 +123,7 @@ public Mono run(CancellationToken cancellationToken) { return this.documentClient.createDocumentChangeFeedQuery( this.settings.getCollectionSelfLink(), this.options, - itemType) - .take(1, true).log("limitRequest"); + itemType); }, 1, 1) .flatMap(documentFeedResponse -> { try { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java index b171f2a07c183..f26de6d10da63 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java @@ -121,7 +121,7 @@ public Mono run(CancellationToken cancellationToken) { return this.documentClient.createDocumentChangeFeedQuery( this.settings.getCollectionSelfLink(), this.options, - JsonNode.class).take(1, true).log("limitRequest"); + JsonNode.class); }, 1, 1) .flatMap(documentFeedResponse -> { try { From f1ae95addca087238e9086aeef54f01b794f8bd4 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 17 Jan 2025 17:22:57 -0800 Subject: [PATCH 04/11] Add test for ff --- .../cosmos/CosmosContainerChangeFeedTest.java | 53 ++++++ .../FullFidelityChangeFeedProcessorTest.java | 101 ++++++++++++ .../IncrementalChangeFeedProcessorTest.java | 151 +++++++++--------- .../IncrementalChangeFeedProcessorTest.java | 2 - .../common/ChangeFeedContextClientImpl.java | 12 +- .../epkversion/PartitionProcessorImpl.java | 7 +- 6 files changed, 236 insertions(+), 90 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 134a6a7e09d08..506723697988c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -51,6 +51,7 @@ import org.testng.annotations.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -323,6 +324,58 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro } } + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void asyncChangeFeedPrefetching() throws Exception { + this.createContainer( + (cp) -> cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy()) + ); + insertDocuments(20, 7); + + + final Map continuations = new HashMap<>(); + + + + for (int i = 0; i < 20; i++) { + String pkValue = partitionKeyToDocuments.keySet().stream().skip(i).findFirst().get(); + logger.info(String.format("Initial validation - PK value: '%s'", pkValue)); + + final int initiallyDeletedDocuments = i < 2 ? 3 : 0; + final int expectedInitialEventCount = 7 - initiallyDeletedDocuments; + + CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning( + FeedRange.forLogicalPartition( + new PartitionKey(pkValue) + )); + AtomicInteger count = new AtomicInteger(0); + createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> { + count.incrementAndGet(); + }).byPage().publishOn(Schedulers.boundedElastic()); + + + + + } + + + Thread.sleep(3000); + + for (int i = 0; i < 20; i++) { + String pkValue = partitionKeyToDocuments.keySet().stream().skip(i).findFirst().get(); + logger.info(String.format("Validation after updates - PK value: '%s'", pkValue)); + + final int expectedEventCountAfterUpdates = i < 5 ? + i < 1 ? 0 : 2 // on the first logical partitions all updated documents were deleted + : 0; // no updates + + CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions + .createForProcessingFromContinuation(continuations.get(pkValue)); + + drainAndValidateChangeFeedResults(options, null, expectedEventCountAfterUpdates); + } + } + @Test(groups = { "emulator" }, timeOut = TIMEOUT) public void asyncChangeFeed_fromBeginning_incremental_forEPK() throws Exception { this.createContainer( diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index 7b4a2cd811f9c..06348b6acf2c1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -62,6 +62,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -187,6 +188,86 @@ public void fullFidelityChangeFeedProcessorStartFromNow(boolean isContextRequire } } + @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) + public void exhaustedRUs() throws InterruptedException { + CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + + try { + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 100); + +// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders +// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) +// .build(); +// +// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); +// +// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() +// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) +// .operationType(FaultInjectionOperationType.REPLACE_ITEM) +// .build(); +// +// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder +// .condition(faultInjectionConditionForRegion) +// .result(serverErrorResult) +// .duration(Duration.ofSeconds(10)) +// .startDelay(Duration.ofSeconds(3)) +// .build(); +// +// CosmosFaultInjectionHelper +// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) +// .block(); + AtomicInteger counter = new AtomicInteger(0); + Set receivedLeaseTokensFromContext = ConcurrentHashMap.newKeySet(); + ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() + .hostName(hostName) + .handleAllVersionsAndDeletesChanges(changeFeedProcessorHandlerLag(receivedDocuments, receivedLeaseTokensFromContext)) + .feedContainer(createdFeedCollection) + .leaseContainer(createdLeaseCollection) + .options(new ChangeFeedProcessorOptions() + .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later + .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later + .setFeedPollDelay(Duration.ofSeconds(2)) + .setLeasePrefix("TEST") + .setMaxItemCount(10) + .setStartFromBeginning(true) + ) + .buildChangeFeedProcessor(); + + try { + changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout + .subscribe(); + } catch (Exception ex) { + logger.error("Change feed processor did not start in the expected time", ex); + throw ex; + } + + // Wait for the feed processor to receive and process the documents. + Thread.sleep(8 * CHANGE_FEED_PROCESSOR_TIMEOUT); + + assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout + + for (InternalObjectNode item : createdDocuments) { + assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); + + assertThat(counter.get()).isEqualTo(10); + } + + // Wait for the feed processor to shutdown. + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + } finally { + safeDeleteCollection(createdFeedCollection); + safeDeleteCollection(createdLeaseCollection); + + // Allow some time for the collections to be deleted before exiting. + Thread.sleep(500); + } + } + // Steps followed in this test // 1. Start the AllVersionsAndDeletes / FULL_FIDELITY ChangeFeedProcessor with startFromBeginning set to 'false'. // 2 Ingest 10 documents into the feed container. @@ -312,6 +393,26 @@ public void fullFidelityChangeFeedModeToIncrementalChangeFeedMode(boolean isStar } } + + private BiConsumer, ChangeFeedProcessorContext> changeFeedProcessorHandlerLag( + Map receivedDocuments, Set receivedLeaseTokensFromContext) { + return (docs, context) -> { + logger.info("START processing from thread in test {}", Thread.currentThread().getId()); + for (ChangeFeedProcessorItem item : docs) { + processItem(item, receivedDocuments); + } + validateChangeFeedProcessorContext(context); + processChangeFeedProcessorContext(context, receivedLeaseTokensFromContext); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + logger.info("END processing from thread {}", Thread.currentThread().getId()); + }; + } + + // Steps followed in this test // 1. Ingest 10 documents into the feed container. // 2. Start the LatestVersion / INCREMENTAL ChangeFeedProcessor with either startFromBeginning set to 'true' or 'false'. diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index 59588f4fa2871..4d94fbc3b768e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -25,6 +25,7 @@ import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; @@ -57,6 +58,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.lang3.RandomStringUtils; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.SkipException; @@ -87,6 +89,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -204,82 +207,84 @@ public void readFeedDocumentsStartFromBeginning() throws InterruptedException { } } - @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) - public void exhaustedRUs() throws InterruptedException { - CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); - CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); - - try { - List createdDocuments = new ArrayList<>(); - Map receivedDocuments = new ConcurrentHashMap<>(); - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 100); - -// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders -// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) -// .build(); +// @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) +// public void exhaustedRUs() throws InterruptedException { +// CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); +// CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); // -// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); +// try { +// List createdDocuments = new ArrayList<>(); +// Map receivedDocuments = new ConcurrentHashMap<>(); +// setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 100); // -// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() -// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) -// .operationType(FaultInjectionOperationType.REPLACE_ITEM) -// .build(); +//// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders +//// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) +//// .build(); +//// +//// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); +//// +//// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() +//// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) +//// .operationType(FaultInjectionOperationType.REPLACE_ITEM) +//// .build(); +//// +//// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder +//// .condition(faultInjectionConditionForRegion) +//// .result(serverErrorResult) +//// .duration(Duration.ofSeconds(10)) +//// .startDelay(Duration.ofSeconds(3)) +//// .build(); +//// +//// CosmosFaultInjectionHelper +//// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) +//// .block(); +// AtomicInteger counter = new AtomicInteger(0); +// changeFeedProcessor = new ChangeFeedProcessorBuilder() +// .hostName(hostName) +// .handleLatestVersionChanges(changeFeedProcessorHandlerLag(receivedDocuments, counter)) +// .feedContainer(createdFeedCollection) +// .leaseContainer(createdLeaseCollection) +// .options(new ChangeFeedProcessorOptions() +// .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later +// .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later +// .setFeedPollDelay(Duration.ofSeconds(2)) +// .setLeasePrefix("TEST") +// .setMaxItemCount(10) +// .setStartFromBeginning(true) +// ) +// .buildChangeFeedProcessor(); // -// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder -// .condition(faultInjectionConditionForRegion) -// .result(serverErrorResult) -// .duration(Duration.ofSeconds(10)) -// .startDelay(Duration.ofSeconds(3)) -// .build(); +// try { +// changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout +// .subscribe(); +// } catch (Exception ex) { +// logger.error("Change feed processor did not start in the expected time", ex); +// throw ex; +// } // -// CosmosFaultInjectionHelper -// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) -// .block(); - - changeFeedProcessor = new ChangeFeedProcessorBuilder() - .hostName(hostName) - .handleLatestVersionChanges(changeFeedProcessorHandler(receivedDocuments)) - .feedContainer(createdFeedCollection) - .leaseContainer(createdLeaseCollection) - .options(new ChangeFeedProcessorOptions() - .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later - .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later - .setFeedPollDelay(Duration.ofSeconds(2)) - .setLeasePrefix("TEST") - .setMaxItemCount(10) - .setStartFromBeginning(true) - ) - .buildChangeFeedProcessor(); - - try { - changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout - .subscribe(); - } catch (Exception ex) { - logger.error("Change feed processor did not start in the expected time", ex); - throw ex; - } - - // Wait for the feed processor to receive and process the documents. - Thread.sleep(8 * CHANGE_FEED_PROCESSOR_TIMEOUT); - - assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); - - changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout - - for (InternalObjectNode item : createdDocuments) { - assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); - } - - // Wait for the feed processor to shutdown. - Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); - } finally { - safeDeleteCollection(createdFeedCollection); - safeDeleteCollection(createdLeaseCollection); - - // Allow some time for the collections to be deleted before exiting. - Thread.sleep(500); - } - } +// // Wait for the feed processor to receive and process the documents. +// Thread.sleep(8 * CHANGE_FEED_PROCESSOR_TIMEOUT); +// +// assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); +// +// changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout +// +// for (InternalObjectNode item : createdDocuments) { +// assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); +// +// assertThat(counter.get()).isEqualTo(10); +// } +// +// // Wait for the feed processor to shutdown. +// Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); +// } finally { +// safeDeleteCollection(createdFeedCollection); +// safeDeleteCollection(createdLeaseCollection); +// +// // Allow some time for the collections to be deleted before exiting. +// Thread.sleep(500); +// } +// } @Test(groups = { "query" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { @@ -2149,6 +2154,8 @@ private Consumer> changeFeedProcessorHandler(Map> changeFeedProcessorHandlerWithCallback( Map receivedDocuments, Callable callBackFunc) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 3acce47095c13..acf0c1a9cd579 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -92,8 +92,6 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase { ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); private CosmosAsyncDatabase createdDatabase; -// private final String databaseId = "testdb1"; -// private final String hostName = "TestHost1"; private final String hostName = RandomStringUtils.randomAlphabetic(6); private final int FEED_COUNT = 10; private final int CHANGE_FEED_PROCESSOR_TIMEOUT = 5000; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java index 40130196cb789..430943dfe4eb3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java @@ -151,7 +151,7 @@ public Flux> createDocumentChangeFeedQuery(CosmosAsyncConta } return collectionLink .queryChangeFeed(changeFeedRequestOptions, klass) - .byPage().take(1, true) + .byPage() .publishOn(this.scheduler); } @@ -200,16 +200,8 @@ public Flux> deleteAllItems(List Mono> replaceItem(String itemId, PartitionKey partitionKey, T document, CosmosItemRequestOptions options) { - if (count.compareAndSet(1, 2)) { - Map map = new HashMap<>(); - map.put("x-ms-retry-after-ms", "1000"); - throw new com.azure.cosmos.CosmosException(429, new com.azure.cosmos.implementation.CosmosError(), map); - - } else { - count.incrementAndGet(); - return cosmosContainer.replaceItem(document, itemId, partitionKey, options) + return cosmosContainer.replaceItem(document, itemId, partitionKey, options) .publishOn(this.scheduler); - } } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index 8457d76898451..ed0333f73b408 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -123,14 +123,9 @@ public Mono run(CancellationToken cancellationToken) { return this.documentClient.createDocumentChangeFeedQuery( this.settings.getCollectionSelfLink(), this.options, - itemType); + itemType).take(1, true); }, 1, 1) .flatMap(documentFeedResponse -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); final String continuationToken = documentFeedResponse.getContinuationToken(); From ee28e03cb6817a881e26ec442e53ca6c5625de2a Mon Sep 17 00:00:00 2001 From: Tomas Varon Saldarriaga Date: Sat, 18 Jan 2025 00:27:11 -0800 Subject: [PATCH 05/11] Add test --- .../directconnectivity/ReflectionUtils.java | 11 +-- .../FullFidelityChangeFeedProcessorTest.java | 77 ++++++------------- .../CosmosChangeFeedRequestOptionsImpl.java | 16 ++-- .../common/ChangeFeedContextClientImpl.java | 2 +- .../ChangeFeedProcessorContextImpl.java | 3 + 5 files changed, 46 insertions(+), 63 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index fd04c961143b1..f268c9cccf626 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -3,11 +3,7 @@ package com.azure.cosmos.implementation.directconnectivity; -import com.azure.cosmos.CosmosAsyncClient; -import com.azure.cosmos.CosmosAsyncContainer; -import com.azure.cosmos.CosmosBridgeInternal; -import com.azure.cosmos.CosmosClient; -import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.*; import com.azure.cosmos.implementation.ApiType; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.ClientSideRequestStatistics; @@ -45,6 +41,7 @@ import com.azure.cosmos.implementation.throughputControl.controller.request.GlobalThroughputRequestController; import com.azure.cosmos.implementation.throughputControl.controller.request.PkRangesThroughputRequestController; import com.azure.cosmos.models.CosmosClientTelemetryConfig; +import com.azure.cosmos.models.FeedResponse; import io.netty.handler.ssl.SslContext; import org.apache.commons.lang3.reflect.FieldUtils; @@ -179,6 +176,10 @@ public static GlobalAddressResolver getGlobalAddressResolver(RxDocumentClientImp return get(GlobalAddressResolver.class, rxDocumentClient, "addressResolver"); } + public static FeedResponse getFeedResponse(ChangeFeedProcessorContext context) { + return get(FeedResponse.class, context, "feedResponse"); + } + public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index 06348b6acf2c1..78e6b39be8c38 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -22,18 +22,8 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1; -import com.azure.cosmos.models.ChangeFeedProcessorItem; -import com.azure.cosmos.models.ChangeFeedProcessorOptions; -import com.azure.cosmos.models.ChangeFeedProcessorState; -import com.azure.cosmos.models.CosmosContainerProperties; -import com.azure.cosmos.models.CosmosContainerRequestOptions; -import com.azure.cosmos.models.CosmosItemRequestOptions; -import com.azure.cosmos.models.CosmosItemResponse; -import com.azure.cosmos.models.CosmosQueryRequestOptions; -import com.azure.cosmos.models.PartitionKey; -import com.azure.cosmos.models.SqlParameter; -import com.azure.cosmos.models.SqlQuerySpec; -import com.azure.cosmos.models.ThroughputProperties; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.models.*; import com.azure.cosmos.rx.TestSuiteBase; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -61,7 +51,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -188,74 +180,51 @@ public void fullFidelityChangeFeedProcessorStartFromNow(boolean isContextRequire } } - @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) - public void exhaustedRUs() throws InterruptedException { + @Test(groups = {"query" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) + public void prefetching() throws InterruptedException { CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 100); - -// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders -// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) -// .build(); -// -// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); -// -// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() -// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) -// .operationType(FaultInjectionOperationType.REPLACE_ITEM) -// .build(); -// -// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder -// .condition(faultInjectionConditionForRegion) -// .result(serverErrorResult) -// .duration(Duration.ofSeconds(10)) -// .startDelay(Duration.ofSeconds(3)) -// .build(); -// -// CosmosFaultInjectionHelper -// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) -// .block(); - AtomicInteger counter = new AtomicInteger(0); Set receivedLeaseTokensFromContext = ConcurrentHashMap.newKeySet(); + AtomicInteger counter = new AtomicInteger(0); + AtomicBoolean noPrefetch = new AtomicBoolean(true); ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() .hostName(hostName) - .handleAllVersionsAndDeletesChanges(changeFeedProcessorHandlerLag(receivedDocuments, receivedLeaseTokensFromContext)) + .handleAllVersionsAndDeletesChanges(changeFeedProcessorHandlerLag(receivedDocuments, receivedLeaseTokensFromContext, counter, noPrefetch)) .feedContainer(createdFeedCollection) .leaseContainer(createdLeaseCollection) .options(new ChangeFeedProcessorOptions() - .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later - .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later .setFeedPollDelay(Duration.ofSeconds(2)) .setLeasePrefix("TEST") .setMaxItemCount(10) - .setStartFromBeginning(true) ) .buildChangeFeedProcessor(); try { - changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout - .subscribe(); + changeFeedProcessor.start().timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribeOn(Schedulers.boundedElastic()).subscribe(); } catch (Exception ex) { logger.error("Change feed processor did not start in the expected time", ex); - throw ex; + throw new RuntimeException(ex); } + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + + setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 30); + + // Wait for the feed processor to receive and process the documents. - Thread.sleep(8 * CHANGE_FEED_PROCESSOR_TIMEOUT); + Thread.sleep(3 * CHANGE_FEED_PROCESSOR_TIMEOUT); assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); - changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout + validateChangeFeedProcessing(changeFeedProcessor, createdDocuments, receivedDocuments, 30, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT); + changeFeedProcessor.stop().timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribeOn(Schedulers.boundedElastic()).block(); + assertThat(noPrefetch.get()).isTrue(); - for (InternalObjectNode item : createdDocuments) { - assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); - assertThat(counter.get()).isEqualTo(10); - } // Wait for the feed processor to shutdown. Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); @@ -395,8 +364,12 @@ public void fullFidelityChangeFeedModeToIncrementalChangeFeedMode(boolean isStar private BiConsumer, ChangeFeedProcessorContext> changeFeedProcessorHandlerLag( - Map receivedDocuments, Set receivedLeaseTokensFromContext) { + Map receivedDocuments, Set receivedLeaseTokensFromContext, AtomicInteger counter, AtomicBoolean noPrefetch) { return (docs, context) -> { + FeedResponse feedResponse = ReflectionUtils.getFeedResponse(context); + if (Integer.parseInt(feedResponse.getResponseHeaders().get("x-ms-cosmos-llsn")) > counter.incrementAndGet() * 10 + 1) { + noPrefetch.set(false); + } logger.info("START processing from thread in test {}", Thread.currentThread().getId()); for (ChangeFeedProcessorItem item : docs) { processItem(item, receivedDocuments); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index c7346570ec275..61ebc645cdd84 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -57,7 +57,11 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { // this.continuationState = toBeCloned.continuationState; - this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) toBeCloned.continuationState); + if (toBeCloned.continuationState != null) { + this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) toBeCloned.continuationState); + } else { + this.continuationState = null; + } this.feedRangeInternal = toBeCloned.feedRangeInternal; this.properties = toBeCloned.properties; this.maxItemCount = toBeCloned.maxItemCount; @@ -77,8 +81,6 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB this.keywordIdentifiers = toBeCloned.keywordIdentifiers; this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved; this.endLSN = toBeCloned.endLSN; - - logger.warn("Cloning options with continuation state {}", continuationState.getContinuation()); } public CosmosChangeFeedRequestOptionsImpl( @@ -100,7 +102,12 @@ public CosmosChangeFeedRequestOptionsImpl( this.maxPrefetchPageCount = DEFAULT_MAX_PREFETCH_PAGE_COUNT; this.feedRangeInternal = feedRange; this.startFromInternal = startFromInternal; - this.continuationState = continuationState; // this should be a clone + if (continuationState != null) { + this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) continuationState); + } else { + this.continuationState = null; + } + if (mode != ChangeFeedMode.INCREMENTAL && mode != ChangeFeedMode.FULL_FIDELITY) { throw new IllegalArgumentException( @@ -117,7 +124,6 @@ public CosmosChangeFeedRequestOptionsImpl( this.properties = new HashMap<>(); this.isSplitHandlingDisabled = false; this.completeAfterAllCurrentChangesRetrieved = DEFAULT_COMPLETE_AFTER_ALL_CURRENT_CHANGES_RETRIEVED; - logger.warn("Creating options with continuation state {}", continuationState.getContinuation()); } public ChangeFeedState getContinuation() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java index 430943dfe4eb3..20cf49a15ba61 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java @@ -151,7 +151,7 @@ public Flux> createDocumentChangeFeedQuery(CosmosAsyncConta } return collectionLink .queryChangeFeed(changeFeedRequestOptions, klass) - .byPage() + .byPage().take(1, true) .publishOn(this.scheduler); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java index 38ee0a0694683..bcbcad72ac7eb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java @@ -6,13 +6,16 @@ import com.azure.cosmos.ChangeFeedProcessorContext; import com.azure.cosmos.CosmosDiagnosticsContext; import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext; +import com.azure.cosmos.models.FeedResponse; public final class ChangeFeedProcessorContextImpl implements ChangeFeedProcessorContext { private final ChangeFeedObserverContext changeFeedObserverContext; + private final FeedResponse feedResponse; public ChangeFeedProcessorContextImpl(ChangeFeedObserverContext changeFeedObserverContext) { this.changeFeedObserverContext = changeFeedObserverContext; + this.feedResponse = changeFeedObserverContext.getFeedResponse(); } @Override From d926e50941d43cf58cf1f925a61cfa76819d1ef5 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Sun, 19 Jan 2025 23:36:42 -0800 Subject: [PATCH 06/11] Revert logs and add test to validate prefetch --- .../cosmos/CosmosContainerChangeFeedTest.java | 55 +++------ .../directconnectivity/ReflectionUtils.java | 11 +- .../FullFidelityChangeFeedProcessorTest.java | 105 ++++-------------- .../IncrementalChangeFeedProcessorTest.java | 85 -------------- .../IncrementalChangeFeedProcessorTest.java | 96 ---------------- .../com/azure/cosmos/CosmosException.java | 2 +- .../CosmosChangeFeedRequestOptionsImpl.java | 4 - .../common/ChangeFeedContextClientImpl.java | 5 +- .../ChangeFeedProcessorContextImpl.java | 3 - .../changefeed/common/ChangeFeedStateV1.java | 4 - .../epkversion/PartitionProcessorImpl.java | 9 +- .../pkversion/PartitionProcessorImpl.java | 8 +- .../query/ChangeFeedFetcher.java | 8 -- .../cosmos/implementation/query/Fetcher.java | 5 +- .../implementation/query/Paginator.java | 2 - 15 files changed, 47 insertions(+), 355 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 506723697988c..a680aaceccfb4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -329,51 +329,24 @@ public void asyncChangeFeedPrefetching() throws Exception { this.createContainer( (cp) -> cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy()) ); - insertDocuments(20, 7); - - - final Map continuations = new HashMap<>(); - - - - for (int i = 0; i < 20; i++) { - String pkValue = partitionKeyToDocuments.keySet().stream().skip(i).findFirst().get(); - logger.info(String.format("Initial validation - PK value: '%s'", pkValue)); - - final int initiallyDeletedDocuments = i < 2 ? 3 : 0; - final int expectedInitialEventCount = 7 - initiallyDeletedDocuments; - - CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions - .createForProcessingFromBeginning( - FeedRange.forLogicalPartition( - new PartitionKey(pkValue) - )); - AtomicInteger count = new AtomicInteger(0); - createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> { - count.incrementAndGet(); - }).byPage().publishOn(Schedulers.boundedElastic()); - - - - - } + insertDocuments(1, 20); + CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forFullRange()).setMaxItemCount(10); + AtomicInteger count = new AtomicInteger(0); + // Will keep grabbing pages + createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> + count.incrementAndGet()).byPage().subscribe(); Thread.sleep(3000); + assertThat(count.get()).isNotEqualTo(2); - for (int i = 0; i < 20; i++) { - String pkValue = partitionKeyToDocuments.keySet().stream().skip(i).findFirst().get(); - logger.info(String.format("Validation after updates - PK value: '%s'", pkValue)); - - final int expectedEventCountAfterUpdates = i < 5 ? - i < 1 ? 0 : 2 // on the first logical partitions all updated documents were deleted - : 0; // no updates - - CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions - .createForProcessingFromContinuation(continuations.get(pkValue)); - - drainAndValidateChangeFeedResults(options, null, expectedEventCountAfterUpdates); - } + count.set(0); + // should only get two pages + createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> + count.incrementAndGet()).byPage().take(2, true).subscribe(); + Thread.sleep(3000); + assertThat(count.get()).isEqualTo(2); } @Test(groups = { "emulator" }, timeOut = TIMEOUT) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index f268c9cccf626..fd04c961143b1 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -3,7 +3,11 @@ package com.azure.cosmos.implementation.directconnectivity; -import com.azure.cosmos.*; +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosBridgeInternal; +import com.azure.cosmos.CosmosClient; +import com.azure.cosmos.CosmosClientBuilder; import com.azure.cosmos.implementation.ApiType; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.ClientSideRequestStatistics; @@ -41,7 +45,6 @@ import com.azure.cosmos.implementation.throughputControl.controller.request.GlobalThroughputRequestController; import com.azure.cosmos.implementation.throughputControl.controller.request.PkRangesThroughputRequestController; import com.azure.cosmos.models.CosmosClientTelemetryConfig; -import com.azure.cosmos.models.FeedResponse; import io.netty.handler.ssl.SslContext; import org.apache.commons.lang3.reflect.FieldUtils; @@ -176,10 +179,6 @@ public static GlobalAddressResolver getGlobalAddressResolver(RxDocumentClientImp return get(GlobalAddressResolver.class, rxDocumentClient, "addressResolver"); } - public static FeedResponse getFeedResponse(ChangeFeedProcessorContext context) { - return get(FeedResponse.class, context, "feedResponse"); - } - public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index 78e6b39be8c38..94ae932bcce95 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -22,8 +22,18 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1; -import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; -import com.azure.cosmos.models.*; +import com.azure.cosmos.models.ChangeFeedProcessorItem; +import com.azure.cosmos.models.ChangeFeedProcessorOptions; +import com.azure.cosmos.models.ChangeFeedProcessorState; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosContainerRequestOptions; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.SqlParameter; +import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.rx.TestSuiteBase; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -51,7 +61,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -180,63 +189,6 @@ public void fullFidelityChangeFeedProcessorStartFromNow(boolean isContextRequire } } - @Test(groups = {"query" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) - public void prefetching() throws InterruptedException { - CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); - CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); - - try { - List createdDocuments = new ArrayList<>(); - Map receivedDocuments = new ConcurrentHashMap<>(); - Set receivedLeaseTokensFromContext = ConcurrentHashMap.newKeySet(); - AtomicInteger counter = new AtomicInteger(0); - AtomicBoolean noPrefetch = new AtomicBoolean(true); - ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() - .hostName(hostName) - .handleAllVersionsAndDeletesChanges(changeFeedProcessorHandlerLag(receivedDocuments, receivedLeaseTokensFromContext, counter, noPrefetch)) - .feedContainer(createdFeedCollection) - .leaseContainer(createdLeaseCollection) - .options(new ChangeFeedProcessorOptions() - .setFeedPollDelay(Duration.ofSeconds(2)) - .setLeasePrefix("TEST") - .setMaxItemCount(10) - ) - .buildChangeFeedProcessor(); - - try { - changeFeedProcessor.start().timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribeOn(Schedulers.boundedElastic()).subscribe(); - } catch (Exception ex) { - logger.error("Change feed processor did not start in the expected time", ex); - throw new RuntimeException(ex); - } - Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); - - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 30); - - - - // Wait for the feed processor to receive and process the documents. - Thread.sleep(3 * CHANGE_FEED_PROCESSOR_TIMEOUT); - - assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); - - validateChangeFeedProcessing(changeFeedProcessor, createdDocuments, receivedDocuments, 30, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT); - changeFeedProcessor.stop().timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribeOn(Schedulers.boundedElastic()).block(); - assertThat(noPrefetch.get()).isTrue(); - - - - // Wait for the feed processor to shutdown. - Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); - } finally { - safeDeleteCollection(createdFeedCollection); - safeDeleteCollection(createdLeaseCollection); - - // Allow some time for the collections to be deleted before exiting. - Thread.sleep(500); - } - } - // Steps followed in this test // 1. Start the AllVersionsAndDeletes / FULL_FIDELITY ChangeFeedProcessor with startFromBeginning set to 'false'. // 2 Ingest 10 documents into the feed container. @@ -362,30 +314,6 @@ public void fullFidelityChangeFeedModeToIncrementalChangeFeedMode(boolean isStar } } - - private BiConsumer, ChangeFeedProcessorContext> changeFeedProcessorHandlerLag( - Map receivedDocuments, Set receivedLeaseTokensFromContext, AtomicInteger counter, AtomicBoolean noPrefetch) { - return (docs, context) -> { - FeedResponse feedResponse = ReflectionUtils.getFeedResponse(context); - if (Integer.parseInt(feedResponse.getResponseHeaders().get("x-ms-cosmos-llsn")) > counter.incrementAndGet() * 10 + 1) { - noPrefetch.set(false); - } - logger.info("START processing from thread in test {}", Thread.currentThread().getId()); - for (ChangeFeedProcessorItem item : docs) { - processItem(item, receivedDocuments); - } - validateChangeFeedProcessorContext(context); - processChangeFeedProcessorContext(context, receivedLeaseTokensFromContext); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - logger.info("END processing from thread {}", Thread.currentThread().getId()); - }; - } - - // Steps followed in this test // 1. Ingest 10 documents into the feed container. // 2. Start the LatestVersion / INCREMENTAL ChangeFeedProcessor with either startFromBeginning set to 'true' or 'false'. @@ -1874,13 +1802,13 @@ private void waitToReceiveDocuments(Map receive assertThat(remainingWork > 0).as("Failed to receive all the feed documents").isTrue(); } - @BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT) + @BeforeClass(groups = { "emulator", "query" }, timeOut = SETUP_TIMEOUT) public void before_FullFidelityChangeFeedProcessorTest() { client = getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); } - @AfterClass(groups = { "emulator" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = { "emulator", "query" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } @@ -1929,6 +1857,11 @@ private CosmosAsyncContainer createFeedCollection(CosmosAsyncDatabase database, return createCollection(database, getCollectionDefinitionWithFullFidelity(), optionsFeedCollection, provisionedThroughput); } + private CosmosAsyncContainer createFeedCollectionWithoutRetention(int provisionedThroughput) { + CosmosContainerRequestOptions optionsFeedCollection = new CosmosContainerRequestOptions(); + return createCollection(createdDatabase, getCollectionDefinition(UUID.randomUUID().toString()), optionsFeedCollection, provisionedThroughput); + } + private CosmosAsyncContainer createLeaseCollection(int provisionedThroughput) { return createLeaseCollection(createdDatabase, provisionedThroughput); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index 4d94fbc3b768e..0156873860efc 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -25,7 +25,6 @@ import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; @@ -46,7 +45,6 @@ import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; -import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; import com.azure.cosmos.test.faultinjection.FaultInjectionRule; @@ -58,7 +56,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.lang3.RandomStringUtils; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.SkipException; @@ -89,7 +86,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -207,85 +203,6 @@ public void readFeedDocumentsStartFromBeginning() throws InterruptedException { } } -// @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) -// public void exhaustedRUs() throws InterruptedException { -// CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); -// CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); -// -// try { -// List createdDocuments = new ArrayList<>(); -// Map receivedDocuments = new ConcurrentHashMap<>(); -// setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 100); -// -//// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders -//// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) -//// .build(); -//// -//// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); -//// -//// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() -//// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) -//// .operationType(FaultInjectionOperationType.REPLACE_ITEM) -//// .build(); -//// -//// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder -//// .condition(faultInjectionConditionForRegion) -//// .result(serverErrorResult) -//// .duration(Duration.ofSeconds(10)) -//// .startDelay(Duration.ofSeconds(3)) -//// .build(); -//// -//// CosmosFaultInjectionHelper -//// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) -//// .block(); -// AtomicInteger counter = new AtomicInteger(0); -// changeFeedProcessor = new ChangeFeedProcessorBuilder() -// .hostName(hostName) -// .handleLatestVersionChanges(changeFeedProcessorHandlerLag(receivedDocuments, counter)) -// .feedContainer(createdFeedCollection) -// .leaseContainer(createdLeaseCollection) -// .options(new ChangeFeedProcessorOptions() -// .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later -// .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later -// .setFeedPollDelay(Duration.ofSeconds(2)) -// .setLeasePrefix("TEST") -// .setMaxItemCount(10) -// .setStartFromBeginning(true) -// ) -// .buildChangeFeedProcessor(); -// -// try { -// changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()) // add timeout -// .subscribe(); -// } catch (Exception ex) { -// logger.error("Change feed processor did not start in the expected time", ex); -// throw ex; -// } -// -// // Wait for the feed processor to receive and process the documents. -// Thread.sleep(8 * CHANGE_FEED_PROCESSOR_TIMEOUT); -// -// assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); -// -// changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); // add timeout -// -// for (InternalObjectNode item : createdDocuments) { -// assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); -// -// assertThat(counter.get()).isEqualTo(10); -// } -// -// // Wait for the feed processor to shutdown. -// Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); -// } finally { -// safeDeleteCollection(createdFeedCollection); -// safeDeleteCollection(createdLeaseCollection); -// -// // Allow some time for the collections to be deleted before exiting. -// Thread.sleep(500); -// } -// } - @Test(groups = { "query" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); @@ -2154,8 +2071,6 @@ private Consumer> changeFeedProcessorHandler(Map> changeFeedProcessorHandlerWithCallback( Map receivedDocuments, Callable callBackFunc) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index acf0c1a9cd579..4db3a10fe7599 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -4,7 +4,6 @@ import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.ChangeFeedProcessorBuilder; -import com.azure.cosmos.ConnectionMode; import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosAsyncContainer; @@ -36,16 +35,6 @@ import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.rx.TestSuiteBase; -import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; -import com.azure.cosmos.test.faultinjection.FaultInjectionCondition; -import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; -import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; -import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; -import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; -import com.azure.cosmos.test.faultinjection.FaultInjectionRule; -import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; -import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResult; -import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -76,7 +65,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @@ -204,90 +192,6 @@ public void readFeedDocumentsStartFromBeginning() throws InterruptedException { } } - @Test(groups = {"query" }/*, timeOut = 2 * TIMEOUT*/) - public void exhaustedRUs() throws InterruptedException { - CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); - CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); - - try { - List createdDocuments = new ArrayList<>(); - Map receivedDocuments = new ConcurrentHashMap<>(); - -// FaultInjectionServerErrorResult serverErrorResult = FaultInjectionResultBuilders -// .getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) -// .build(); -// -// FaultInjectionRuleBuilder faultInjectionRuleBuilder = new FaultInjectionRuleBuilder("faultInjectionRule-" + UUID.randomUUID()); -// -// FaultInjectionCondition faultInjectionConditionForRegion = new FaultInjectionConditionBuilder() -// .connectionType(clientAccessor.getConnectionMode(client).equals(ConnectionMode.DIRECT.toString()) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY) -// .operationType(FaultInjectionOperationType.REPLACE_ITEM) -// .build(); -// -// FaultInjectionRule faultInjectionRule = faultInjectionRuleBuilder -// .condition(faultInjectionConditionForRegion) -// .result(serverErrorResult) -// .duration(Duration.ofSeconds(10)) -// .startDelay(Duration.ofSeconds(3)) -// .build(); -// -// CosmosFaultInjectionHelper -// .configureFaultInjectionRules(createdLeaseCollection, Arrays.asList(faultInjectionRule)) -// .block(); - - CompletableFuture setupDocumentsFuture = CompletableFuture.runAsync(() -> { - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, 200); - }); - - changeFeedProcessor = new ChangeFeedProcessorBuilder() - .hostName(hostName) - .handleChanges(changeFeedProcessorHandler(receivedDocuments)) - .feedContainer(createdFeedCollection) - .leaseContainer(createdLeaseCollection) - .options(new ChangeFeedProcessorOptions() - .setLeaseRenewInterval(Duration.ofMinutes(10)) // only for debugging remove later - .setLeaseExpirationInterval(Duration.ofMinutes(11)) // only for debugging remove later - .setFeedPollDelay(Duration.ofSeconds(2)) - .setLeasePrefix("TEST") - .setMaxItemCount(10) - .setStartFromBeginning(true) - ) - .buildChangeFeedProcessor(); - - CompletableFuture startProcessorFuture = CompletableFuture.runAsync(() -> { - try { - changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic()).subscribe(); - } catch (Exception ex) { - logger.error("Change feed processor did not start in the expected time", ex); - throw new RuntimeException(ex); - } - }); - - CompletableFuture.allOf(setupDocumentsFuture, startProcessorFuture).join(); - - // Wait for the feed processor to receive and process the documents. - Thread.sleep(10 * CHANGE_FEED_PROCESSOR_TIMEOUT); - - assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue(); - - changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).subscribe(); - - for (InternalObjectNode item : createdDocuments) { - assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); - } - - // Wait for the feed processor to shutdown. - Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); - } finally { - safeDeleteCollection(createdFeedCollection); - safeDeleteCollection(createdLeaseCollection); - - // Allow some time for the collections to be deleted before exiting. - Thread.sleep(500); - } - } - - @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void readFeedDocumentsStartFromCustomDate() throws InterruptedException { CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java index 6bd340fc20bb8..0cf4870cf485b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosException.java @@ -205,7 +205,7 @@ protected CosmosException(int statusCode, Exception innerException) { * @param cosmosErrorResource the error resource object. * @param responseHeaders the response headers. */ - public CosmosException(int statusCode, CosmosError cosmosErrorResource, Map responseHeaders) { + protected CosmosException(int statusCode, CosmosError cosmosErrorResource, Map responseHeaders) { this(/* resourceAddress */ null, statusCode, cosmosErrorResource, responseHeaders); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index 61ebc645cdd84..8ed54d006e4f2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -19,8 +19,6 @@ import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.PartitionKeyDefinition; import com.azure.cosmos.util.Beta; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -53,10 +51,8 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ private Set keywordIdentifiers; private boolean completeAfterAllCurrentChangesRetrieved; private Long endLSN; - private static final Logger logger = LoggerFactory.getLogger(CosmosChangeFeedRequestOptionsImpl.class); public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { -// this.continuationState = toBeCloned.continuationState; if (toBeCloned.continuationState != null) { this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) toBeCloned.continuationState); } else { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java index 20cf49a15ba61..d48c307333781 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java @@ -37,14 +37,11 @@ import java.net.URI; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static com.azure.cosmos.CosmosBridgeInternal.getContextClient; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; -import static java.lang.Thread.sleep; /** * Implementation for ChangeFeedDocumentClient. @@ -201,7 +198,7 @@ public Flux> deleteAllItems(List Mono> replaceItem(String itemId, PartitionKey partitionKey, T document, CosmosItemRequestOptions options) { return cosmosContainer.replaceItem(document, itemId, partitionKey, options) - .publishOn(this.scheduler); + .publishOn(this.scheduler); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java index bcbcad72ac7eb..38ee0a0694683 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedProcessorContextImpl.java @@ -6,16 +6,13 @@ import com.azure.cosmos.ChangeFeedProcessorContext; import com.azure.cosmos.CosmosDiagnosticsContext; import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext; -import com.azure.cosmos.models.FeedResponse; public final class ChangeFeedProcessorContextImpl implements ChangeFeedProcessorContext { private final ChangeFeedObserverContext changeFeedObserverContext; - private final FeedResponse feedResponse; public ChangeFeedProcessorContextImpl(ChangeFeedObserverContext changeFeedObserverContext) { this.changeFeedObserverContext = changeFeedObserverContext; - this.feedResponse = changeFeedObserverContext.getFeedResponse(); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java index 33363029ec570..a406b669d3887 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java @@ -10,8 +10,6 @@ import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.query.CompositeContinuationToken; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -20,7 +18,6 @@ import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; public class ChangeFeedStateV1 extends ChangeFeedState { - private static final Logger logger = LoggerFactory.getLogger(ChangeFeedStateV1.class); private final String containerRid; private final FeedRangeInternal feedRange; private final ChangeFeedMode mode; @@ -66,7 +63,6 @@ public FeedRangeContinuation getContinuation() { public ChangeFeedState setContinuation(FeedRangeContinuation continuation) { checkNotNull(continuation, "Argument 'continuation' must not be null."); continuation.validateContainer(this.containerRid); - logger.warn("Setting continuation: {}", this.continuation); this.continuation = continuation; return this; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index ed0333f73b408..855e36c8d9f17 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -5,7 +5,6 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.ThroughputControlGroupConfig; import com.azure.cosmos.implementation.CosmosSchedulers; -import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; @@ -34,7 +33,6 @@ import java.time.Duration; import java.time.Instant; -import java.util.Optional; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -59,8 +57,6 @@ class PartitionProcessorImpl implements PartitionProcessor { private volatile String lastServerContinuationToken; private volatile boolean hasMoreResults; private final FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager; - private static final ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.CosmosChangeFeedRequestOptionsAccessor optionsAccessor = - ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor(); public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, @@ -123,8 +119,8 @@ public Mono run(CancellationToken cancellationToken) { return this.documentClient.createDocumentChangeFeedQuery( this.settings.getCollectionSelfLink(), this.options, - itemType).take(1, true); - }, 1, 1) + itemType); + }) .flatMap(documentFeedResponse -> { if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); @@ -232,7 +228,6 @@ public Mono run(CancellationToken cancellationToken) { case TRANSIENT_ERROR: { // Retry on transient (429) errors if (clientException.getRetryAfterDuration().toMillis() > 0) { - logger.warn("Retrying transient error after {}", clientException.getRetryAfterDuration()); Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), MILLIS); return Mono.just(clientException.getRetryAfterDuration().toMillis()) // set some seed value to be able to run // the repeat loop diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java index f26de6d10da63..56d405fb3e426 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java @@ -122,13 +122,8 @@ public Mono run(CancellationToken cancellationToken) { this.settings.getCollectionSelfLink(), this.options, JsonNode.class); - }, 1, 1) + }) .flatMap(documentFeedResponse -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); final String continuationToken = documentFeedResponse.getContinuationToken(); @@ -232,7 +227,6 @@ public Mono run(CancellationToken cancellationToken) { case TRANSIENT_ERROR: { // Retry on transient (429) errors if (clientException.getRetryAfterDuration().toMillis() > 0) { - logger.warn("Retrying Transient Error."); Instant stopTimer = Instant.now().plus(clientException.getRetryAfterDuration().toMillis(), MILLIS); return Mono.just(clientException.getRetryAfterDuration().toMillis()) // set some seed value to be able to run // the repeat loop diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java index c0fbf34b2818a..0ab2f47917029 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java @@ -39,12 +39,10 @@ class ChangeFeedFetcher extends Fetcher { private final static ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseAccessor = ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor(); - private static final Logger logger = LoggerFactory.getLogger(ChangeFeedFetcher.class); private final ChangeFeedState changeFeedState; private final Supplier createRequestFunc; private final Supplier feedRangeContinuationRetryPolicySupplier; private final boolean completeAfterAllCurrentChangesRetrieved; - private int count = 0; private final Long endLSN; public ChangeFeedFetcher( @@ -117,7 +115,6 @@ private Mono> nextPageInternal(DocumentClientRetryPolicy retryPo return Mono.fromSupplier(() -> nextPageCore(retryPolicy)) .flatMap(Function.identity()) .flatMap((r) -> { - logger.warn("ChangeFeedFetcher: received {} results", r.getResults().size()); FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation(); @@ -174,11 +171,6 @@ protected boolean isFullyDrained(boolean isChangeFeed, FeedResponse response) } FeedRangeContinuation continuation = this.changeFeedState.getContinuation(); - logger.warn("ChangeFeedFetcher: isFullyDrained: {}, --- {}", continuation.isDone(), this.changeFeedState.getContinuation()); - if (count == 1) { - Thread.dumpStack(); - } - count++; return continuation != null && continuation.isDone(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java index 2a7c9287061e9..dad3ff190abba 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Fetcher.java @@ -131,14 +131,17 @@ private void updateState(FeedResponse response, RxDocumentServiceRequest requ (top.get() != 0) && // if fullyDrained then done !this.isFullyDrained(this.isChangeFeed, response)) { + shouldFetchMore.set(true); } else { shouldFetchMore.set(false); } - logger.warn("Fetcher state updated: " + + if (logger.isDebugEnabled()) { + logger.debug("Fetcher state updated: " + "isChangeFeed = {}, continuation token = {}, max item count = {}, should fetch more = {}, Context: {}", isChangeFeed, this.getContinuationForLogging(), maxItemCount.get(), shouldFetchMore.get(), this.operationContextTextProvider.get()); + } } protected void reEnableShouldFetchMoreForRetry() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java index 6889ae4a0ae4c..b41f6cfb1cdfb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java @@ -101,8 +101,6 @@ public static Flux> getChangeFeedQueryResultAsObservable( Long endLsn, OperationContextAndListenerTuple operationContext) { - logger.warn("Change feed fetcher execution with prefetch count: {}", preFetchCount); - return getPaginatedQueryResultAsObservable( () -> new ChangeFeedFetcher<>( client, From b4ae773f15bcd6de45468bc17a4f5a74b1595e4d Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Sun, 19 Jan 2025 23:40:33 -0800 Subject: [PATCH 07/11] unused imports --- .../azure/cosmos/CosmosContainerChangeFeedTest.java | 1 - .../FullFidelityChangeFeedProcessorTest.java | 11 ++--------- .../IncrementalChangeFeedProcessorTest.java | 3 --- .../common/ChangeFeedContextClientImpl.java | 2 -- 4 files changed, 2 insertions(+), 15 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index a680aaceccfb4..32e1cc2c81a3b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -51,7 +51,6 @@ import org.testng.annotations.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import java.nio.charset.StandardCharsets; import java.time.Duration; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java index 94ae932bcce95..7b4a2cd811f9c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java @@ -62,8 +62,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -1802,13 +1800,13 @@ private void waitToReceiveDocuments(Map receive assertThat(remainingWork > 0).as("Failed to receive all the feed documents").isTrue(); } - @BeforeClass(groups = { "emulator", "query" }, timeOut = SETUP_TIMEOUT) + @BeforeClass(groups = { "emulator" }, timeOut = SETUP_TIMEOUT) public void before_FullFidelityChangeFeedProcessorTest() { client = getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); } - @AfterClass(groups = { "emulator", "query" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = { "emulator" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } @@ -1857,11 +1855,6 @@ private CosmosAsyncContainer createFeedCollection(CosmosAsyncDatabase database, return createCollection(database, getCollectionDefinitionWithFullFidelity(), optionsFeedCollection, provisionedThroughput); } - private CosmosAsyncContainer createFeedCollectionWithoutRetention(int provisionedThroughput) { - CosmosContainerRequestOptions optionsFeedCollection = new CosmosContainerRequestOptions(); - return createCollection(createdDatabase, getCollectionDefinition(UUID.randomUUID().toString()), optionsFeedCollection, provisionedThroughput); - } - private CosmosAsyncContainer createLeaseCollection(int provisionedThroughput) { return createLeaseCollection(createdDatabase, provisionedThroughput); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index 0156873860efc..c9693cc619315 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -19,7 +19,6 @@ import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.TestConfigurations; @@ -98,8 +97,6 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase { private final static Logger logger = LoggerFactory.getLogger(IncrementalChangeFeedProcessorTest.class); private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper(); - private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor clientAccessor = - ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); private CosmosAsyncDatabase createdDatabase; private final String hostName = RandomStringUtils.randomAlphabetic(6); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java index d48c307333781..b231547f3f85d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java @@ -38,7 +38,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import static com.azure.cosmos.CosmosBridgeInternal.getContextClient; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -54,7 +53,6 @@ public class ChangeFeedContextClientImpl implements ChangeFeedContextClient { private Scheduler scheduler; private static final ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.CosmosAsyncDatabaseAccessor cosmosAsyncDatabaseAccessor = ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor(); - private AtomicInteger count = new AtomicInteger(0); /** * Initializes a new instance of the {@link ChangeFeedContextClient} interface. From e4f1170fe629cd25523774f56463b5fae25166d2 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Sun, 19 Jan 2025 23:41:52 -0800 Subject: [PATCH 08/11] unused imports --- .../pkversion/IncrementalChangeFeedProcessorTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 4db3a10fe7599..5de7e69e5d2ad 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -16,7 +16,6 @@ import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; -import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalObjectNode; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.Utils; @@ -76,8 +75,6 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase { private final static Logger log = LoggerFactory.getLogger(IncrementalChangeFeedProcessorTest.class); private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper(); - private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor clientAccessor = - ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); private CosmosAsyncDatabase createdDatabase; private final String hostName = RandomStringUtils.randomAlphabetic(6); From f296c7d58efb8e4561d84cc32708426828ce9ef2 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Mon, 20 Jan 2025 18:35:01 -0800 Subject: [PATCH 09/11] Added Changelog --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 71a32eb345023..2892323ed1e33 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixes an issue in change feed processor where records are skipped and excessive requests are prefetched. - See [PR 43788](https://github.com/Azure/azure-sdk-for-java/pull/43788) #### Other Changes From cffc19ac08466277a472a7d0b592d5f3b028c8ee Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 21 Jan 2025 15:04:59 -0800 Subject: [PATCH 10/11] React to comments --- .../cosmos/CosmosContainerChangeFeedTest.java | 70 +++++++++++++++---- .../IncrementalChangeFeedProcessorTest.java | 2 + 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 32e1cc2c81a3b..3f0c2330c9494 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -10,6 +10,7 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.RetryAnalyzer; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1; import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; @@ -70,6 +71,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -119,6 +121,14 @@ public static Object[][] changeFeedQueryEndLSNDataProvider() { }; } + @DataProvider(name = "changeFeedQueryPrefetchingDataProvider") + public static Object[][] changeFeedQueryPrefetchingDataProvider() { + return new Object[][]{ + {ChangeFeedMode.FULL_FIDELITY}, + { ChangeFeedMode.INCREMENTAL}, + }; + } + @DataProvider(name = "changeFeedQueryEndLSNHangDataProvider") public static Object[][] changeFeedQueryEndLSNHangDataProvider() { return new Object[][]{ @@ -323,27 +333,59 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro } } - @Test(groups = { "emulator" }, timeOut = TIMEOUT) - public void asyncChangeFeedPrefetching() throws Exception { + @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT) + public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception { this.createContainer( - (cp) -> cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy()) + (cp) -> { + if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) { + return cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy()); + } + return cp.setChangeFeedPolicy(ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10))); + } ); - insertDocuments(1, 20); - - CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions - .createForProcessingFromBeginning(FeedRange.forFullRange()).setMaxItemCount(10); + CosmosChangeFeedRequestOptions options; + if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) { + options = CosmosChangeFeedRequestOptions + .createForProcessingFromNow(FeedRange.forFullRange()) + .setMaxItemCount(10).allVersionsAndDeletes(); + } else { + options = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forFullRange()).setMaxItemCount(10); + } AtomicInteger count = new AtomicInteger(0); - // Will keep grabbing pages - createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> - count.incrementAndGet()).byPage().subscribe(); - + insertDocuments(5, 20); + AtomicReference continuation = new AtomicReference<>(""); + createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> { + count.incrementAndGet(); + continuation.set(r.getContinuationToken()); + } + ).byPage().subscribe(); + + CosmosChangeFeedRequestOptions optionsFF = null; + if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) { + insertDocuments(5, 20); + count.set(0); + optionsFF = CosmosChangeFeedRequestOptions + .createForProcessingFromContinuation(continuation.get()) + .setMaxItemCount(10).allVersionsAndDeletes(); + createdContainer.asyncContainer.queryChangeFeed(optionsFF, ObjectNode.class).handle((r) -> { + count.incrementAndGet(); + continuation.set(r.getContinuationToken()); + } + ).byPage().subscribe(); + } Thread.sleep(3000); - assertThat(count.get()).isNotEqualTo(2); + assertThat(count.get()).isGreaterThan(2); + if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) { + // full fidelity is only from now so need to insert more documents + insertDocuments(5, 20); + } count.set(0); // should only get two pages - createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> - count.incrementAndGet()).byPage().take(2, true).subscribe(); + createdContainer.asyncContainer.queryChangeFeed(changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)? optionsFF + : options, ObjectNode.class).handle((r) -> count.incrementAndGet()) + .byPage().take(2, true).subscribe(); Thread.sleep(3000); assertThat(count.get()).isEqualTo(2); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index 5de7e69e5d2ad..0f0ab04840e1d 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -77,6 +77,8 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase { private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper(); private CosmosAsyncDatabase createdDatabase; +// private final String databaseId = "testdb1"; +// private final String hostName = "TestHost1"; private final String hostName = RandomStringUtils.randomAlphabetic(6); private final int FEED_COUNT = 10; private final int CHANGE_FEED_PROCESSOR_TIMEOUT = 5000; From 6873e036a8309980c8aed1b2e47302b77f2d058f Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Tue, 21 Jan 2025 15:07:18 -0800 Subject: [PATCH 11/11] Unused import --- .../java/com/azure/cosmos/CosmosContainerChangeFeedTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 3f0c2330c9494..709231de78532 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -71,7 +71,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function;