Skip to content

Commit

Permalink
Parallelize expired lease check in processor host (#333)
Browse files Browse the repository at this point in the history
* Parallelize expired lease check

* -
  • Loading branch information
serkantkaraca authored Oct 17, 2018
1 parent ee7804b commit cb3082e
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,16 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception
// Inspect all leases.
// Acquire any expired leases.
// Renew any leases that currently belong to us.
IEnumerable<Task<Lease>> gettingAllLeases = leaseManager.GetAllLeases();
List<Lease> leasesOwnedByOthers = new List<Lease>();
var gettingAllLeases = leaseManager.GetAllLeases();
var leasesOwnedByOthers = new List<Lease>();
var renewLeaseTasks = new List<Task>();
int ourLeaseCount = 0;

// First thing is first, renew owned leases.
foreach (Task<Lease> getLeaseTask in gettingAllLeases)
{
try
{
{
var lease = await getLeaseTask.ConfigureAwait(false);
allLeases[lease.PartitionId] = lease;
if (lease.Owner == this.host.HostName)
Expand Down Expand Up @@ -245,37 +245,36 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception

// Wait until we are done with renewing our own leases here.
// In theory, this should never throw, error are logged and notified in the renew tasks.
await Task.WhenAll(renewLeaseTasks.ToArray()).ConfigureAwait(false);
await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished.");

// Check any expired leases that we can grab here.
foreach (var possibleLease in allLeases.Values)
{
try
var checkLeaseTasks = new List<Task>();
foreach (var possibleLease in allLeases.Values.Where(lease => lease.Owner != this.host.HostName))
{
checkLeaseTasks.Add(Task.Run(async () =>
{
if (await possibleLease.IsExpired().ConfigureAwait(false))
try
{
bool isExpiredLeaseOwned = possibleLease.Owner == this.host.HostName;
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Trying to acquire lease.");
if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false))
if (await possibleLease.IsExpired().ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Acquired lease.");

// Don't double count if we have already counted this lease at the beginning of the loop.
if (!isExpiredLeaseOwned)
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Trying to acquire lease.");
if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false))
{
ourLeaseCount++;
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Acquired lease.");
}
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, possibleLease.PartitionId, "Failure during acquiring lease", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, possibleLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases);
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, possibleLease.PartitionId, "Failure during acquiring lease", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, possibleLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases);
}
}));
}

await Task.WhenAll(checkLeaseTasks);

// Grab more leases if available and needed for load balancing
if (leasesOwnedByOthers.Count > 0)
{
Expand Down

0 comments on commit cb3082e

Please sign in to comment.