Skip to content

Commit

Permalink
Fixes for new partition management (#717)
Browse files Browse the repository at this point in the history
* fix lease balancer logic for ownership leases

* exclude lease renewals from request throttling

* rename force to throttle as suggested

* fix missed occurrence of renaming throttle to force
  • Loading branch information
sebastianburckhardt authored Apr 28, 2022
1 parent e4405bc commit 516f999
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,10 @@ async Task<IDictionary<string, T>> TakeLeasesAsync()
}

int myCount = workerToShardCount[this.workerName];
int moreShardsNeeded = target - myCount;

// if we are stealing intent leases, we are trying to take as many as required to reach balance
// if we are aquiring owner leases, we want to acquire all of the expired leases
int moreShardsNeeded = this.options.ShouldStealLeases ? target - myCount : expiredLeases.Count;

if (moreShardsNeeded > 0)
{
Expand Down
20 changes: 13 additions & 7 deletions src/DurableTask.AzureStorage/Storage/AzureStorageClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,21 @@ public Task<T> MakeQueueStorageRequest<T>(Func<OperationContext, CancellationTok
public Task<T> MakeTableStorageRequest<T>(Func<OperationContext, CancellationToken, Task<T>> storageRequest, string operationName, string? clientRequestId = null) =>
this.MakeStorageRequest<T>(storageRequest, TableAccountName, operationName, clientRequestId);

public Task MakeBlobStorageRequest(Func<OperationContext, CancellationToken, Task> storageRequest, string operationName, string? clientRequestId = null) =>
this.MakeStorageRequest(storageRequest, BlobAccountName, operationName, clientRequestId);
public Task MakeBlobStorageRequest(Func<OperationContext, CancellationToken, Task> storageRequest, string operationName, string? clientRequestId = null, bool force = false) =>
this.MakeStorageRequest(storageRequest, BlobAccountName, operationName, clientRequestId, force);

public Task MakeQueueStorageRequest(Func<OperationContext, CancellationToken, Task> storageRequest, string operationName, string? clientRequestId = null) =>
this.MakeStorageRequest(storageRequest, QueueAccountName, operationName, clientRequestId);

public Task MakeTableStorageRequest(Func<OperationContext, CancellationToken, Task> storageRequest, string operationName, string? clientRequestId = null) =>
this.MakeStorageRequest(storageRequest, TableAccountName, operationName, clientRequestId);

private async Task<T> MakeStorageRequest<T>(Func<OperationContext, CancellationToken, Task<T>> storageRequest, string accountName, string operationName, string? clientRequestId = null)
private async Task<T> MakeStorageRequest<T>(Func<OperationContext, CancellationToken, Task<T>> storageRequest, string accountName, string operationName, string? clientRequestId = null, bool force = false)
{
await requestThrottleSemaphore.WaitAsync();
if (!force)
{
await requestThrottleSemaphore.WaitAsync();
}

try
{
Expand All @@ -138,12 +141,15 @@ private async Task<T> MakeStorageRequest<T>(Func<OperationContext, CancellationT
}
finally
{
requestThrottleSemaphore.Release();
if (!force)
{
requestThrottleSemaphore.Release();
}
}
}

private Task MakeStorageRequest(Func<OperationContext, CancellationToken, Task> storageRequest, string accountName, string operationName, string? clientRequestId = null) =>
this.MakeStorageRequest<object?>((context, cancellationToken) => WrapFunctionWithReturnType(storageRequest, context, cancellationToken), accountName, operationName, clientRequestId);
private Task MakeStorageRequest(Func<OperationContext, CancellationToken, Task> storageRequest, string accountName, string operationName, string? clientRequestId = null, bool force = false) =>
this.MakeStorageRequest<object?>((context, cancellationToken) => WrapFunctionWithReturnType(storageRequest, context, cancellationToken), accountName, operationName, clientRequestId, force);

private static async Task<object?> WrapFunctionWithReturnType(Func<OperationContext, CancellationToken, Task> storageRequest, OperationContext context, CancellationToken cancellationToken)
{
Expand Down
3 changes: 2 additions & 1 deletion src/DurableTask.AzureStorage/Storage/Blob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public async Task RenewLeaseAsync(string leaseId)
var requestOptions = new BlobRequestOptions { ServerTimeout = azureStorageClient.Settings.LeaseRenewInterval };
await this.azureStorageClient.MakeBlobStorageRequest(
(context, cancellationToken) => this.cloudBlockBlob.RenewLeaseAsync(AccessCondition.GenerateLeaseCondition(leaseId), requestOptions, context, cancellationToken),
"Blob RenewLease");
"Blob RenewLease",
force: true); // lease renewals should not be throttled
}

public async Task ReleaseLeaseAsync(string leaseId)
Expand Down

0 comments on commit 516f999

Please sign in to comment.