Skip to content

Commit

Permalink
[Internal] Request Hedging: Refactors HedgeRegion diagnostics field t…
Browse files Browse the repository at this point in the history
…o only show successful region (#4625)

* changed hedge region to only show sucessfull region

* added and updated tests

* discussion changes

* suggestions and test improvements

* test fix

* updated hedge context

* fix test

* no string conversion

* requested changes

* suggested changes

* remove and sort usings
  • Loading branch information
NaluTripician authored Aug 9, 2024
1 parent c98e632 commit c2043ae
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class CrossRegionParallelHedgingAvailabilityStrategy : AvailabilityStrategy
{
private const string HedgeRegions = "Hedge Regions";
private const string HedgeContext = "Hedge Context";
private const string HedgeContextOriginalRequest = "Original Request";
private const string HedgeContextHedgedRequest = "Hedged Request";
private const string ResponseRegion = "Response Region";

/// <summary>
/// Latency threshold which activates the first region hedging
Expand Down Expand Up @@ -123,9 +121,8 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(

List<Task> requestTasks = new List<Task>(hedgeRegions.Count + 1);

Task<(bool, ResponseMessage)> primaryRequest = null;

ResponseMessage responseMessage = null;
Task<HedgingResponse> primaryRequest = null;
HedgingResponse hedgeResponse = null;

//Send out hedged requests
for (int requestNumber = 0; requestNumber < hedgeRegions.Count; requestNumber++)
Expand All @@ -142,14 +139,15 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
primaryRequest = this.RequestSenderAndResultCheckAsync(
sender,
request,
hedgeRegions.ElementAt(requestNumber),
cancellationToken,
cancellationTokenSource);

requestTasks.Add(primaryRequest);
}
else
{
Task<(bool, ResponseMessage)> requestTask = this.CloneAndSendAsync(
Task<HedgingResponse> requestTask = this.CloneAndSendAsync(
sender: sender,
request: request,
clonedBody: clonedBody,
Expand Down Expand Up @@ -180,19 +178,18 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
AggregateException innerExceptions = completedTask.Exception.Flatten();
}

(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
if (isNonTransient)
hedgeResponse = await (Task<HedgingResponse>)completedTask;
if (hedgeResponse.IsNonTransient)
{
cancellationTokenSource.Cancel();
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeRegions,
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
//Take is not inclusive, so we need to add 1 to the request number which starts at 0
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
object.ReferenceEquals(primaryRequest, completedTask)
? HedgeContextOriginalRequest
: HedgeContextHedgedRequest);
return responseMessage;
hedgeRegions.Take(requestNumber + 1));
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}
}
Expand All @@ -210,19 +207,17 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
lastException = innerExceptions.InnerExceptions.FirstOrDefault();
}

(bool isNonTransient, responseMessage) = await (Task<(bool, ResponseMessage)>)completedTask;
if (isNonTransient || requestTasks.Count == 0)
hedgeResponse = await (Task<HedgingResponse>)completedTask;
if (hedgeResponse.IsNonTransient || requestTasks.Count == 0)
{
cancellationTokenSource.Cancel();
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeRegions,
HedgeRegionsToString(responseMessage.Diagnostics.GetContactedRegions()));
((CosmosTraceDiagnostics)responseMessage.Diagnostics).Value.AddOrUpdateDatum(
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
object.ReferenceEquals(primaryRequest, completedTask)
? HedgeContextOriginalRequest
: HedgeContextHedgedRequest);
return responseMessage;
hedgeRegions);
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}

Expand All @@ -231,13 +226,13 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
throw lastException;
}

Debug.Assert(responseMessage != null);
return responseMessage;
Debug.Assert(hedgeResponse != null);
return hedgeResponse.ResponseMessage;
}
}
}

private async Task<(bool, ResponseMessage)> CloneAndSendAsync(
private async Task<HedgingResponse> CloneAndSendAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
RequestMessage request,
CloneableStream clonedBody,
Expand All @@ -256,20 +251,23 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
clonedRequest.RequestOptions ??= new RequestOptions();

List<string> excludeRegions = new List<string>(hedgeRegions);
string region = excludeRegions[requestNumber];
excludeRegions.RemoveAt(requestNumber);
clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;

return await this.RequestSenderAndResultCheckAsync(
sender,
clonedRequest,
region,
cancellationToken,
cancellationTokenSource);
}
}

private async Task<(bool, ResponseMessage)> RequestSenderAndResultCheckAsync(
private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
RequestMessage request,
string hedgedRegion,
CancellationToken cancellationToken,
CancellationTokenSource cancellationTokenSource)
{
Expand All @@ -282,14 +280,15 @@ public override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
{
cancellationTokenSource.Cancel();
}
return (true, response);

return new HedgingResponse(true, response, hedgedRegion);
}

return (false, response);
return new HedgingResponse(false, response, hedgedRegion);
}
catch (OperationCanceledException) when (cancellationTokenSource.IsCancellationRequested)
{
return (false, null);
return new HedgingResponse(false, null, hedgedRegion);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -323,9 +322,18 @@ private static bool IsFinalResult(int statusCode, int subStatusCode)
return statusCode == (int)HttpStatusCode.NotFound && subStatusCode == (int)SubStatusCodes.Unknown;
}

private static string HedgeRegionsToString(IReadOnlyList<(string, Uri)> hedgeRegions)
private sealed class HedgingResponse
{
return string.Join(",", hedgeRegions);
public readonly bool IsNonTransient;
public readonly ResponseMessage ResponseMessage;
public readonly string ResponseRegion;

public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string responseRegion)
{
this.IsNonTransient = isNonTransient;
this.ResponseMessage = responseMessage;
this.ResponseRegion = responseRegion;
}
}
}
}
Loading

0 comments on commit c2043ae

Please sign in to comment.