Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CFP AVAD: Adds new FeedRange to ChangeFeedProcessorContext #4621

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,10 @@ public abstract class ChangeFeedProcessorContext
/// Gets the headers related to the service response that provided the changes.
/// </summary>
public abstract Headers Headers { get; }

/// <summary>
/// The <see cref="FeedRange"/> within the monitored container from which the changes emanated.
/// </summary>
public abstract FeedRange FeedRange { get; }
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ private void HandleFailedRequest(

private Task DispatchChangesAsync(ResponseMessage response, CancellationToken cancellationToken)
{
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(this.options.LeaseToken, response, this.checkpointer);
ChangeFeedObserverContextCore context = new ChangeFeedObserverContextCore(
this.options.LeaseToken,
response,
this.checkpointer,
this.options.FeedRange);
return this.observer.ProcessChangesAsync(context, response.Content, cancellationToken);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public override FeedProcessor Create(DocumentServiceLease lease, ChangeFeedObser
FeedPollDelay = this.changeFeedProcessorOptions.FeedPollDelay,
MaxItemCount = this.changeFeedProcessorOptions.MaxItemCount,
StartFromBeginning = this.changeFeedProcessorOptions.StartFromBeginning,
StartTime = this.changeFeedProcessorOptions.StartTime
StartTime = this.changeFeedProcessorOptions.StartTime,
FeedRange = lease.FeedRange,
};

PartitionCheckpointerCore checkpointer = new PartitionCheckpointerCore(this.leaseCheckpointer, lease);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing
{
using System;
using Microsoft.Azure.Documents;

internal class ProcessorOptions
{
Expand All @@ -22,5 +21,7 @@ internal class ProcessorOptions
public DateTime? StartTime { get; set; }

public TimeSpan RequestTimeout { get; set; } = CosmosHttpClient.GatewayRequestTimeout;

public FeedRange FeedRange { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
Expand All @@ -24,11 +23,13 @@ internal sealed class ChangeFeedObserverContextCore
internal ChangeFeedObserverContextCore(
string leaseToken,
ResponseMessage feedResponse,
PartitionCheckpointer checkpointer)
PartitionCheckpointer checkpointer,
FeedRange feedRange)
{
this.LeaseToken = leaseToken;
this.responseMessage = feedResponse;
this.checkpointer = checkpointer;
this.FeedRange = feedRange;
}

public string LeaseToken { get; }
Expand All @@ -37,6 +38,8 @@ internal ChangeFeedObserverContextCore(

public Headers Headers => this.responseMessage.Headers;

public FeedRange FeedRange { get; }

public async Task CheckpointAsync()
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ public ChangeFeedProcessorContextCore(ChangeFeedObserverContextCore changeFeedOb
public override CosmosDiagnostics Diagnostics => this.changeFeedObserverContextCore.Diagnostics;

public override Headers Headers => this.changeFeedObserverContextCore.Headers;

public override FeedRange FeedRange => this.changeFeedObserverContextCore.FeedRange;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public async Task TestWithRunningProcessor()
int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
.GetChangeFeedProcessorBuilder("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
this.ValidateContext(context);
await this.ValidateContextAsync(context);
processedDocCount += docs.Count();
foreach (dynamic doc in docs)
{
Expand All @@ -60,8 +60,6 @@ public async Task TestWithRunningProcessor()
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
})
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();
Expand Down Expand Up @@ -95,7 +93,7 @@ public async Task TestWithRunningProcessor_WithManualCheckpoint()
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilderWithManualCheckpoint("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, Func<Task> checkpointAsync, CancellationToken token) =>
{
this.ValidateContext(context);
await this.ValidateContextAsync(context);
processedDocCount += docs.Count();
foreach (dynamic doc in docs)
{
Expand Down Expand Up @@ -131,14 +129,13 @@ public async Task TestWithRunningProcessor_WithManualCheckpoint()
leaseReleaseCount++;
return Task.CompletedTask;
})
.WithErrorNotification((string leaseToken, Exception exception) =>
.WithErrorNotification(async (string leaseToken, Exception exception) =>
{
errorCount++;
ChangeFeedProcessorUserException cfpException = exception as ChangeFeedProcessorUserException;
Assert.IsNotNull(cfpException);
Assert.ReferenceEquals(exceptionToPropagate, exception.InnerException);
this.ValidateContext(cfpException.ChangeFeedProcessorContext);
return Task.CompletedTask;
await this.ValidateContextAsync(cfpException.ChangeFeedProcessorContext);
})
.WithLeaseContainer(this.LeaseContainer).Build();

Expand Down Expand Up @@ -179,9 +176,9 @@ await NonPartitionedContainerHelper.CreateNonPartitionedContainer(
int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
.GetChangeFeedProcessorBuilder("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
this.ValidateContext(context);
await this.ValidateContextAsync(context);
processedDocCount += docs.Count();
foreach (dynamic doc in docs)
{
Expand All @@ -192,8 +189,6 @@ await NonPartitionedContainerHelper.CreateNonPartitionedContainer(
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
})
.WithInstanceName("random")
.WithLeaseContainer(fixedLeasesContainer).Build();
Expand Down Expand Up @@ -240,9 +235,9 @@ public async Task TestReducePageSizeScenario()
int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
.GetChangeFeedProcessorBuilder("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
this.ValidateContext(context);
await this.ValidateContextAsync(context);
processedDocCount += docs.Count();
foreach (dynamic doc in docs)
{
Expand All @@ -253,8 +248,6 @@ public async Task TestReducePageSizeScenario()
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
})
.WithStartFromBeginning()
.WithInstanceName("random")
Expand Down Expand Up @@ -296,9 +289,9 @@ public async Task TestWithStartTime_Beginning()
int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
.GetChangeFeedProcessorBuilder("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
this.ValidateContext(context);
await this.ValidateContextAsync(context);
Assert.IsTrue(docs.Count > 0);
processedDocCount += docs.Count;
foreach (dynamic doc in docs)
Expand All @@ -310,8 +303,6 @@ public async Task TestWithStartTime_Beginning()
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
})
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.WithInstanceName("random")
Expand Down Expand Up @@ -351,9 +342,9 @@ public async Task TestWithStartTime_CustomTime()
int processedDocCount = 0;
string accumulator = string.Empty;
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
.GetChangeFeedProcessorBuilder("test", async (ChangeFeedProcessorContext context, IReadOnlyCollection<dynamic> docs, CancellationToken token) =>
{
this.ValidateContext(context);
await this.ValidateContextAsync(context);
Assert.IsTrue(docs.Count > 0);
processedDocCount += docs.Count;
foreach (dynamic doc in docs)
Expand All @@ -365,8 +356,6 @@ public async Task TestWithStartTime_CustomTime()
{
allDocsProcessed.Set();
}

return Task.CompletedTask;
})
.WithStartTime(now)
.WithInstanceName("random")
Expand All @@ -380,7 +369,7 @@ public async Task TestWithStartTime_CustomTime()
Assert.AreEqual("doc5.doc6.doc7.doc8.doc9.", accumulator);
}

private void ValidateContext(ChangeFeedProcessorContext changeFeedProcessorContext)
private async Task ValidateContextAsync(ChangeFeedProcessorContext changeFeedProcessorContext)
{
Assert.IsNotNull(changeFeedProcessorContext.LeaseToken);
Assert.IsNotNull(changeFeedProcessorContext.Diagnostics);
Expand All @@ -389,6 +378,17 @@ private void ValidateContext(ChangeFeedProcessorContext changeFeedProcessorConte
Assert.IsTrue(changeFeedProcessorContext.Headers.RequestCharge > 0);
string diagnosticsAsString = changeFeedProcessorContext.Diagnostics.ToString();
Assert.IsTrue(diagnosticsAsString.Contains("Change Feed Processor Read Next Async"));

await this.ValidateFeedRangeAsync(changeFeedProcessorContext.FeedRange);
}

private async Task ValidateFeedRangeAsync(FeedRange feedRange)
{
Assert.IsNotNull(feedRange);

IEnumerable<string> partitionKeyRanges = await this.Container.GetPartitionKeyRangesAsync(feedRange);

Assert.IsNotNull(partitionKeyRanges);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AutoCheckPointTests()

ResponseMessage responseMessage = new ResponseMessage();
responseMessage.Headers.ContinuationToken = Guid.NewGuid().ToString();
this.observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, this.partitionCheckpointer.Object);
this.observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, this.partitionCheckpointer.Object, FeedRangeEpk.FullRange);
}

[TestMethod]
Expand Down Expand Up @@ -78,7 +78,7 @@ public async Task ProcessChanges_WhenCheckpointThrows_ShouldThrow()

ResponseMessage responseMessage = new ResponseMessage();
responseMessage.Headers.ContinuationToken = Guid.NewGuid().ToString();
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore observerContext = new ChangeFeedObserverContextCore(Guid.NewGuid().ToString(), feedResponse: responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException caught = await Assert.ThrowsExceptionAsync<CosmosException>(() => this.sut.ProcessChangesAsync(observerContext, this.stream, CancellationToken.None));
Assert.AreEqual(original, caught);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void ExposesResponseProperties()
ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK);
responseMessage.Headers.RequestCharge = 10;

ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>());
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, Mock.Of<PartitionCheckpointer>(), FeedRangeEpk.FullRange);

Assert.AreEqual(leaseToken, changeFeedObserverContextCore.LeaseToken);
Assert.ReferenceEquals(responseMessage.Headers, changeFeedObserverContextCore.Headers);
Expand All @@ -40,7 +40,7 @@ public async Task TryCheckpoint_OnSuccess()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).Returns(Task.CompletedTask);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

await changeFeedObserverContextCore.CheckpointAsync();
}
Expand All @@ -54,7 +54,7 @@ public async Task TryCheckpoint_OnLeaseLost()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(new LeaseLostException());
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.AreEqual(HttpStatusCode.PreconditionFailed, exception.StatusCode);
Expand All @@ -70,7 +70,7 @@ public async Task TryCheckpoint_OnCosmosException()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

CosmosException exception = await Assert.ThrowsExceptionAsync<CosmosException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
Expand All @@ -86,7 +86,7 @@ public async Task TryCheckpoint_OnUnknownException()
responseMessage.Headers.ContinuationToken = continuation;
Mock<PartitionCheckpointer> checkpointer = new Mock<PartitionCheckpointer>();
checkpointer.Setup(c => c.CheckpointPartitionAsync(It.Is<string>(s => s == continuation))).ThrowsAsync(cosmosException);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object);
ChangeFeedObserverContextCore changeFeedObserverContextCore = new ChangeFeedObserverContextCore(leaseToken, responseMessage, checkpointer.Object, FeedRangeEpk.FullRange);

NotImplementedException exception = await Assert.ThrowsExceptionAsync<NotImplementedException>(() => changeFeedObserverContextCore.CheckpointAsync());
Assert.ReferenceEquals(cosmosException, exception);
Expand Down
Loading
Loading