diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 134a6a7e09d08..709231de78532 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -10,6 +10,7 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.RetryAnalyzer; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1; import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; @@ -119,6 +120,14 @@ public static Object[][] changeFeedQueryEndLSNDataProvider() { }; } + @DataProvider(name = "changeFeedQueryPrefetchingDataProvider") + public static Object[][] changeFeedQueryPrefetchingDataProvider() { + return new Object[][]{ + {ChangeFeedMode.FULL_FIDELITY}, + { ChangeFeedMode.INCREMENTAL}, + }; + } + @DataProvider(name = "changeFeedQueryEndLSNHangDataProvider") public static Object[][] changeFeedQueryEndLSNHangDataProvider() { return new Object[][]{ @@ -323,6 +332,63 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro } } + @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT) + public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception { + this.createContainer( + (cp) -> { + if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) { + return cp.setChangeFeedPolicy(ChangeFeedPolicy.createLatestVersionPolicy()); + } + return cp.setChangeFeedPolicy(ChangeFeedPolicy.createAllVersionsAndDeletesPolicy(Duration.ofMinutes(10))); + } + ); + CosmosChangeFeedRequestOptions options; + if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) { + options = CosmosChangeFeedRequestOptions + .createForProcessingFromNow(FeedRange.forFullRange()) + .setMaxItemCount(10).allVersionsAndDeletes(); + } else { + options = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forFullRange()).setMaxItemCount(10); + } + AtomicInteger count = new AtomicInteger(0); + insertDocuments(5, 20); + AtomicReference continuation = new AtomicReference<>(""); + createdContainer.asyncContainer.queryChangeFeed(options, ObjectNode.class).handle((r) -> { + count.incrementAndGet(); + continuation.set(r.getContinuationToken()); + } + ).byPage().subscribe(); + + CosmosChangeFeedRequestOptions optionsFF = null; + if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) { + insertDocuments(5, 20); + count.set(0); + optionsFF = CosmosChangeFeedRequestOptions + .createForProcessingFromContinuation(continuation.get()) + .setMaxItemCount(10).allVersionsAndDeletes(); + createdContainer.asyncContainer.queryChangeFeed(optionsFF, ObjectNode.class).handle((r) -> { + count.incrementAndGet(); + continuation.set(r.getContinuationToken()); + } + ).byPage().subscribe(); + } + Thread.sleep(3000); + assertThat(count.get()).isGreaterThan(2); + + if (changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)) { + // full fidelity is only from now so need to insert more documents + insertDocuments(5, 20); + } + count.set(0); + // should only get two pages + createdContainer.asyncContainer.queryChangeFeed(changeFeedMode.equals(ChangeFeedMode.FULL_FIDELITY)? optionsFF + : options, ObjectNode.class).handle((r) -> count.incrementAndGet()) + .byPage().take(2, true).subscribe(); + Thread.sleep(3000); + assertThat(count.get()).isEqualTo(2); + } + @Test(groups = { "emulator" }, timeOut = TIMEOUT) public void asyncChangeFeed_fromBeginning_incremental_forEPK() throws Exception { this.createContainer( diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 71a32eb345023..2892323ed1e33 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixes an issue in change feed processor where records are skipped and excessive requests are prefetched. - See [PR 43788](https://github.com/Azure/azure-sdk-for-java/pull/43788) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index 4bbdf764cce38..8ed54d006e4f2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -11,6 +11,7 @@ import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal; import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple; import com.azure.cosmos.models.CosmosRequestOptions; @@ -52,7 +53,11 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ private Long endLSN; public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { - this.continuationState = toBeCloned.continuationState; + if (toBeCloned.continuationState != null) { + this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) toBeCloned.continuationState); + } else { + this.continuationState = null; + } this.feedRangeInternal = toBeCloned.feedRangeInternal; this.properties = toBeCloned.properties; this.maxItemCount = toBeCloned.maxItemCount; @@ -93,7 +98,12 @@ public CosmosChangeFeedRequestOptionsImpl( this.maxPrefetchPageCount = DEFAULT_MAX_PREFETCH_PAGE_COUNT; this.feedRangeInternal = feedRange; this.startFromInternal = startFromInternal; - this.continuationState = continuationState; + if (continuationState != null) { + this.continuationState = new ChangeFeedStateV1((ChangeFeedStateV1) continuationState); + } else { + this.continuationState = null; + } + if (mode != ChangeFeedMode.INCREMENTAL && mode != ChangeFeedMode.FULL_FIDELITY) { throw new IllegalArgumentException( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java index 9b3c58966495d..b231547f3f85d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedContextClientImpl.java @@ -7,12 +7,8 @@ import com.azure.cosmos.CosmosAsyncDatabase; import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.implementation.AsyncDocumentClient; -import com.azure.cosmos.implementation.ChangeFeedOperationState; -import com.azure.cosmos.implementation.Document; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; -import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionKeyRange; -import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.models.CosmosBulkOperationResponse; @@ -42,7 +38,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; import static com.azure.cosmos.CosmosBridgeInternal.getContextClient; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -151,7 +146,7 @@ public Flux> createDocumentChangeFeedQuery(CosmosAsyncConta } return collectionLink .queryChangeFeed(changeFeedRequestOptions, klass) - .byPage() + .byPage().take(1, true) .publishOn(this.scheduler); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java index 4311f1631462f..a406b669d3887 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedStateV1.java @@ -11,6 +11,8 @@ import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.query.CompositeContinuationToken; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -38,6 +40,20 @@ public ChangeFeedStateV1( this.mode = mode; } + public ChangeFeedStateV1(ChangeFeedStateV1 toBeCloned) { + this.containerRid = toBeCloned.containerRid; + this.feedRange = toBeCloned.feedRange; + this.startFromSettings = toBeCloned.startFromSettings; + if (toBeCloned.continuation != null) { + List compositeContinuationTokens = new ArrayList<>(); + compositeContinuationTokens.addAll(toBeCloned.continuation.getCompositeContinuationTokens()); + this.continuation = FeedRangeContinuation.create(toBeCloned.continuation.getContainerRid(), toBeCloned.continuation.getFeedRange(), compositeContinuationTokens); + } else { + this.continuation = null; + } + this.mode = toBeCloned.mode; + } + @Override public FeedRangeContinuation getContinuation() { return this.continuation; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java index bef9beccd058c..e2e543d2df17b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java @@ -232,7 +232,6 @@ public Mono> getCurrentState() { return this.feedContextClient .createDocumentChangeFeedQuery(this.feedContextClient.getContainerClient(), options, ChangeFeedProcessorItem.class, false) - .take(1) .map(feedResponse -> { ChangeFeedProcessorState changeFeedProcessorState = new ChangeFeedProcessorState() .setHostName(lease.getOwner()) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index 91c8dcb2d1466..855e36c8d9f17 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -119,8 +119,7 @@ public Mono run(CancellationToken cancellationToken) { return this.documentClient.createDocumentChangeFeedQuery( this.settings.getCollectionSelfLink(), this.options, - itemType) - .limitRequest(1); + itemType); }) .flatMap(documentFeedResponse -> { if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java index 6cd323479ba3b..56d405fb3e426 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java @@ -121,7 +121,7 @@ public Mono run(CancellationToken cancellationToken) { return this.documentClient.createDocumentChangeFeedQuery( this.settings.getCollectionSelfLink(), this.options, - JsonNode.class).limitRequest(1); + JsonNode.class); }) .flatMap(documentFeedResponse -> { if (cancellationToken.isCancellationRequested()) return Flux.error(new TaskCancelledException());