diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs index af3e937..86b8733 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs @@ -200,8 +200,8 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception // Inspect all leases. // Acquire any expired leases. // Renew any leases that currently belong to us. - IEnumerable> gettingAllLeases = leaseManager.GetAllLeases(); - List leasesOwnedByOthers = new List(); + var gettingAllLeases = leaseManager.GetAllLeases(); + var leasesOwnedByOthers = new List(); var renewLeaseTasks = new List(); int ourLeaseCount = 0; @@ -209,7 +209,7 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception foreach (Task getLeaseTask in gettingAllLeases) { try - { + { var lease = await getLeaseTask.ConfigureAwait(false); allLeases[lease.PartitionId] = lease; if (lease.Owner == this.host.HostName) @@ -245,37 +245,36 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception // Wait until we are done with renewing our own leases here. // In theory, this should never throw, error are logged and notified in the renew tasks. - await Task.WhenAll(renewLeaseTasks.ToArray()).ConfigureAwait(false); + await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false); ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished."); // Check any expired leases that we can grab here. - foreach (var possibleLease in allLeases.Values) - { - try + var checkLeaseTasks = new List(); + foreach (var possibleLease in allLeases.Values.Where(lease => lease.Owner != this.host.HostName)) + { + checkLeaseTasks.Add(Task.Run(async () => { - if (await possibleLease.IsExpired().ConfigureAwait(false)) + try { - bool isExpiredLeaseOwned = possibleLease.Owner == this.host.HostName; - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Trying to acquire lease."); - if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false)) + if (await possibleLease.IsExpired().ConfigureAwait(false)) { - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Acquired lease."); - - // Don't double count if we have already counted this lease at the beginning of the loop. - if (!isExpiredLeaseOwned) + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Trying to acquire lease."); + if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false)) { - ourLeaseCount++; + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Acquired lease."); } } } - } - catch (Exception e) - { - ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, possibleLease.PartitionId, "Failure during acquiring lease", e.ToString()); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, possibleLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases); - } + catch (Exception e) + { + ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, possibleLease.PartitionId, "Failure during acquiring lease", e.ToString()); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, possibleLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases); + } + })); } + await Task.WhenAll(checkLeaseTasks); + // Grab more leases if available and needed for load balancing if (leasesOwnedByOthers.Count > 0) {