Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CFP AVAD: Adds new FeedRange to ChangeFeedProcessorContext #4621

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public abstract class ChangeFeedProcessorContext
/// Gets the headers related to the service response that provided the changes.
/// </summary>
public abstract Headers Headers { get; }

/// <summary>
/// Gets the feed range.
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public abstract FeedRange FeedRange { get; }
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ private void HandleFailedRequest(

private Task DispatchChangesAsync(ResponseMessage response, CancellationToken cancellationToken)
{
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.options.LeaseToken, response, this.checkpointer);
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(
this.options.LeaseToken,
response,
this.checkpointer,
this.options.FeedRange);
return this.observer.ProcessChangesAsync(context, response.Content, cancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public override FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObser
FeedPollDelay = this.changeFeedProcessorOptions.FeedPollDelay,
MaxItemCount = this.changeFeedProcessorOptions.MaxItemCount,
StartFromBeginning = this.changeFeedProcessorOptions.StartFromBeginning,
StartTime = this.changeFeedProcessorOptions.StartTime
StartTime = this.changeFeedProcessorOptions.StartTime,
FeedRange = lease.FeedRange,
};

PartitionCheckpointerCore checkpointer = new PartitionCheckpointerCore(this.leaseCheckpointer, lease);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
using System;
using Microsoft.Azure.Documents;

internal class ProcessorOptions
{
Expand All @@ -22,5 +21,7 @@ internal class ProcessorOptions
public DateTime? StartTime { get; set; }

public TimeSpan RequestTimeout { get; set; } = CosmosHttpClient.GatewayRequestTimeout;

public FeedRangeInternal FeedRange { get; set; }
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
Expand All @@ -24,11 +23,13 @@ internal sealed class ChangeFeedObserverContextCore
internal ChangeFeedObserverContextCore(
string leaseToken,
ResponseMessage feedResponse,
PartitionCheckpointer checkpointer)
PartitionCheckpointer checkpointer,
FeedRange feedRange)
{
this.LeaseToken = leaseToken;
this.responseMessage = feedResponse;
this.checkpointer = checkpointer;
this.FeedRange = feedRange;
}

public string LeaseToken { get; }
Expand All @@ -37,6 +38,8 @@ internal ChangeFeedObserverContextCore(

public Headers Headers => this.responseMessage.Headers;

public FeedRange FeedRange { get; }

public async Task CheckpointAsync()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ public ChangeFeedProcessorContextCore(ChangeFeedObserverContextCore changeFeedOb
public override CosmosDiagnostics Diagnostics => this.changeFeedObserverContextCore.Diagnostics;

public override Headers Headers => this.changeFeedObserverContextCore.Headers;

public override FeedRange FeedRange => this.changeFeedObserverContextCore.FeedRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Services.Management.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
Expand All @@ -20,6 +21,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
[TestCategory("ChangeFeedProcessor")]
public class GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests : BaseChangeFeedClientHelper
{
private ContainerResponse containerResponse;

[TestInitialize]
public async Task TestInitialize()
{
Expand Down Expand Up @@ -573,20 +576,86 @@ private static async Task BuildChangeFeedProcessorWithAllVersionsAndDeletesAsync
private async Task<ContainerInternal> CreateMonitoredContainer(ChangeFeedMode changeFeedMode)
{
string PartitionKey = "/pk";
ContainerProperties properties = new ContainerProperties(id: Guid.NewGuid().ToString(),
ContainerProperties containerProperties = new ContainerProperties(id: Guid.NewGuid().ToString(),
partitionKeyPath: PartitionKey);

if (changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes)
{
properties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5);
properties.DefaultTimeToLive = -1;
containerProperties.ChangeFeedPolicy.FullFidelityRetention = TimeSpan.FromMinutes(5);
containerProperties.DefaultTimeToLive = -1;
}

ContainerResponse response = await this.database.CreateContainerAsync(properties,
this.containerResponse = await this.database.CreateContainerAsync(containerProperties,
throughput: 10000,
cancellationToken: this.cancellationToken);

return (ContainerInternal)response;
return (ContainerInternal)this.containerResponse;
}

[TestMethod]
[Owner("philipthomas-MSFT")]
[Description("Scenario: Test to confirm that FeedRange is coming back in the context.")]
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
public async Task WhenADocumentHasChangedThenFeedRangeInContextTestsAsync()
{
ContainerInternal monitoredContainer = await this.CreateMonitoredContainer(ChangeFeedMode.AllVersionsAndDeletes);
ManualResetEvent allDocsProcessed = new ManualResetEvent(false);
Exception exception = default;

ChangeFeedProcessor processor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes(processorName: "processor", onChangesDelegate: async (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<dynamic>> docs, CancellationToken token) =>
{
await GetChangeFeedProcessorBuilderWithAllVersionsAndDeletesTests.ValidateFeedRangeAsync(
this.GetClient(),
context.FeedRange,
containerRid: this.containerResponse.Resource.ResourceId);

allDocsProcessed.Set();

return;
})
.WithInstanceName(Guid.NewGuid().ToString())
.WithLeaseContainer(this.LeaseContainer)
.WithErrorNotification((leaseToken, error) =>
{
exception = error.InnerException;

return Task.CompletedTask;
})
.Build();

// Start the processor, insert 1 document to generate a checkpoint, modify it, and then delete it.
// 1 second delay between operations to get different timestamps.

await processor.StartAsync();
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);

await monitoredContainer.CreateItemAsync<dynamic>(new { id = "1", pk = "1", description = "original test" }, partitionKey: new PartitionKey("1"));
await Task.Delay(1000);

bool isStartOk = allDocsProcessed.WaitOne(10 * BaseChangeFeedClientHelper.ChangeFeedSetupTime);

await processor.StopAsync();

if (exception != default)
{
Assert.Fail(exception.ToString());
}
}

private static async Task ValidateFeedRangeAsync(CosmosClient cosmosClient, FeedRange feedRange, string containerRid)
{
Assert.IsNotNull(feedRange);

Routing.PartitionKeyRangeCache partitionKeyRangeCache = await cosmosClient.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton);
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
IReadOnlyList<Documents.PartitionKeyRange> parttionKeyRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync(
collectionRid: containerRid,
range: ((FeedRangeEpk)feedRange).Range,
trace: NoOpTrace.Singleton,
forceRefresh: true);

Assert.IsNotNull(parttionKeyRanges);
Logger.LogLine($"{nameof(parttionKeyRanges)} -> {JsonConvert.SerializeObject(parttionKeyRanges)}");
Assert.AreEqual(1, parttionKeyRanges.Count);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AutoCheckPointTests()

ResponseMessage responseMessage = new ResponseMessage();
responseMessage.Headers.ContinuationToken = Guid.NewGuid().ToString();
this.observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, this.partitionCheckpointer.Object);
this.observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, this.partitionCheckpointer.Object, FeedRangeEpk.FullRange);
}

[TestMethod]
Expand Down Expand Up @@ -78,7 +78,7 @@ public async Task ProcessChanges_WhenCheckpointThrows_ShouldThrow()

ResponseMessage responseMessage = new ResponseMessage();
responseMessage.Headers.ContinuationToken = Guid.NewGuid().ToString();
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException caught = await Assert.ThrowsExceptionAsync<CosmosException>(() => this.sut.ProcessChangesAsync(observerContext, this.stream, CancellationToken.None));
Assert.AreEqual(original, caught);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void ExposesResponseProperties()
ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK);
responseMessage.Headers.RequestCharge = 10;

ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

Assert.AreEqual(leaseToken, changeFeedObserverContextCore.LeaseToken);
Assert.ReferenceEquals(responseMessage.Headers, changeFeedObserverContextCore.Headers);
Expand All @@ -40,7 +40,7 @@ public async Task TryCheckpoint_OnSuccess()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).Returns(Task.CompletedTask);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

await changeFeedObserverContextCore.CheckpointAsync();
}
Expand All @@ -54,7 +54,7 @@ public async Task TryCheckpoint_OnLeaseLost()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(new LeaseLostException());
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.AreEqual(HttpStatusCode.PreconditionFailed, exception.StatusCode);
Expand All @@ -70,7 +70,7 @@ public async Task TryCheckpoint_OnCosmosException()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
Expand All @@ -86,7 +86,7 @@ public async Task TryCheckpoint_OnUnknownException()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

NotImplementedException exception = await Assert.ThrowsExceptionAsync<NotImplementedException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Task changesHandler(IReadOnlyCollection<dynamic> docs, CancellationToken token)
Assert.IsNotNull(changeFeedObserver);

ResponseMessage responseMessage = this.BuildResponseMessage();
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -67,7 +67,7 @@ Task changesHandler(ChangeFeedProcessorContext context, IReadOnlyCollection<dyna
Assert.IsNotNull(changeFeedObserver);

ResponseMessage responseMessage = this.BuildResponseMessage();
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -92,7 +92,7 @@ Task changesHandler(ChangeFeedProcessorContext context, IReadOnlyCollection<dyna
Assert.IsNotNull(changeFeedObserver);

ResponseMessage responseMessage = this.BuildResponseMessage();
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -117,7 +117,7 @@ Task changesHandler(ChangeFeedProcessorContext context, Stream stream, Cancellat
Assert.IsNotNull(changeFeedObserver);


ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand All @@ -141,7 +141,7 @@ Task changesHandler(ChangeFeedProcessorContext context, Stream stream, Func<Task

Assert.IsNotNull(changeFeedObserver);

ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

await changeFeedObserver.ProcessChangesAsync(context, responseMessage.Content, CancellationToken.None);
Assert.IsTrue(executed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ObserverExceptionTests
public void ValidateConstructor()
{
ResponseMessage responseMessage = new ResponseMessage();
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);
ChangeFeedProcessorContextCore changeFeedProcessorContext = new ChangeFeedProcessorContextCore(observerContext);
Exception exception = new Exception("randomMessage");
ChangeFeedProcessorUserException ex = new ChangeFeedProcessorUserException(exception, changeFeedProcessorContext);
Expand All @@ -35,7 +35,7 @@ public void ValidateConstructor()
public void ValidateSerialization_AllFields()
{
ResponseMessage responseMessage = new ResponseMessage();
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);
ChangeFeedProcessorContextCore changeFeedProcessorContext = new ChangeFeedProcessorContextCore(observerContext);
Exception exception = new Exception("randomMessage");
ChangeFeedProcessorUserException originalException = new ChangeFeedProcessorUserException(exception, changeFeedProcessorContext);
Expand Down
Loading
Loading