From cec369a9fa601490220c849dc04036d783d93dbb Mon Sep 17 00:00:00 2001 From: Serkant Karaca Date: Mon, 11 Feb 2019 13:34:24 -0800 Subject: [PATCH] Reduce the number of storage calls in lease manager (#357) * Couple improvements in Azure Lease Manager to reduce numberof storage calls. * N/A as partition id * Go with default timeout * Moving to most recent AMQP release * Fix flaky EPH test * Adding 30 seconds default operation timeout back to tests. * Reducing EPH to storage IO calls. * Couple more fixes * . * Set token for owned leases. * Refresh lease before acquiring in processor host. * Fix metada removal order during lease release. * Update lease token only for already running pumps to avoid resetting receiver position data. * FetchAttributesAsync of blob as part of GetAllLeasesAsync() call. * Refresh lease before attempting to steal * Don't retry if we already lost the lease during receiver open. * Don't attempt to steal if owner has changed from the calculation time to refresh time. * - * Partition pump to close when hit ReceiverDisconnectedException since this is not recoverable. * - * Ignore any failure during releasing the lease * Don't update pump token if token is empty * Nullify the owner on the lease in case this host lost it. * Increment ourLeaseCount when a lease is acquired. * Correcting task list * No need to assign pump lease token to downloaded lease. * comment update * comment update * Clear ownership on partial acquisition. * Clear ownership on partial acquisition. * Make sure we don't leave the lease as owned if acquisition failed. * Adding logs to debug lease corruption bug * Adding logs to debug lease corruption bug * Small fix at steal lease check * Protect subject iterator variable during task creation in for loops. * . * Renew lease right after ChangeLease call * Don't create pump if partition expired or already moved to some other host. * Use refreshed lease while creating partition pump. * Remove temporary debug logs. * Addressing SJ's comments * Remove obsolete --- .../AzureBlobLease.cs | 22 +- .../AzureStorageCheckpointLeaseManager.cs | 118 +++--- .../EventHubPartitionPump.cs | 7 + .../EventProcessorHostActionStrings.cs | 2 + .../ICheckpointManager.cs | 7 - .../ILeaseManager.cs | 2 +- ...Microsoft.Azure.EventHubs.Processor.csproj | 2 +- .../PartitionManager.cs | 352 ++++++++++++------ .../PartitionPump.cs | 13 +- .../Amqp/AmqpPartitionReceiver.cs | 6 + .../Microsoft.Azure.EventHubs.csproj | 2 +- .../Client/DataBatchTests.cs | 2 +- .../Client/NegativeCases.cs | 2 +- .../Processor/ProcessorTestBase.cs | 12 +- .../TestUtility.cs | 3 +- 15 files changed, 362 insertions(+), 190 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs index f1c54cc..0bf9090 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs @@ -9,6 +9,8 @@ namespace Microsoft.Azure.EventHubs.Processor class AzureBlobLease : Lease { + readonly bool isOwned; + // ctor needed for deserialization internal AzureBlobLease() { @@ -17,14 +19,23 @@ internal AzureBlobLease() internal AzureBlobLease(string partitionId, CloudBlockBlob blob) : base(partitionId) { this.Blob = blob; - } + this.isOwned = blob.Properties.LeaseState == LeaseState.Leased; + } + + internal AzureBlobLease(string partitionId, string owner, CloudBlockBlob blob) : base(partitionId) + { + this.Blob = blob; + this.Owner = owner; + this.isOwned = blob.Properties.LeaseState == LeaseState.Leased; + } - internal AzureBlobLease(AzureBlobLease source) + internal AzureBlobLease(AzureBlobLease source) : base(source) { this.Offset = source.Offset; this.SequenceNumber = source.SequenceNumber; this.Blob = source.Blob; + this.isOwned = source.isOwned; } internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(source) @@ -32,17 +43,16 @@ internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(sourc this.Offset = source.Offset; this.SequenceNumber = source.SequenceNumber; this.Blob = blob; + this.isOwned = blob.Properties.LeaseState == LeaseState.Leased; } // do not serialize [JsonIgnore] public CloudBlockBlob Blob { get; } - public override async Task IsExpired() + public override Task IsExpired() { - await this.Blob.FetchAttributesAsync().ConfigureAwait(false); // Get the latest metadata - var currentState = this.Blob.Properties.LeaseState; - return currentState != LeaseState.Leased; + return Task.FromResult(!this.isOwned); } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs index 33aed6c..1e37ea3 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.EventHubs.Processor { using System; using System.Collections.Generic; + using System.Linq; using System.Threading.Tasks; using Microsoft.Azure.EventHubs.Primitives; using Newtonsoft.Json; @@ -13,6 +14,8 @@ namespace Microsoft.Azure.EventHubs.Processor class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseManager { + static string MetaDataOwnerName = "OWNINGHOST"; + EventProcessorHost host; TimeSpan leaseDuration; TimeSpan leaseRenewInterval; @@ -107,7 +110,9 @@ public Task CheckpointStoreExistsAsync() public Task CreateCheckpointStoreIfNotExistsAsync() { - return CreateLeaseStoreIfNotExistsAsync(); + // Because we control the caller, we know that this method will only be called after createLeaseStoreIfNotExists. + // In this implementation, it's the same store, so the store will always exist if execution reaches here. + return Task.FromResult(true); } public async Task GetCheckpointAsync(string partitionId) @@ -126,19 +131,10 @@ public async Task GetCheckpointAsync(string partitionId) return checkpoint; } - [Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] - public Task UpdateCheckpointAsync(Checkpoint checkpoint) - { - throw new NotImplementedException(); - } - - public async Task CreateCheckpointIfNotExistsAsync(string partitionId) + public Task CreateCheckpointIfNotExistsAsync(string partitionId) { // Normally the lease will already be created, checkpoint store is initialized after lease store. - AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false); - Checkpoint checkpoint = new Checkpoint(partitionId, lease.Offset, lease.SequenceNumber); - - return checkpoint; + return Task.FromResult(null); } public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) @@ -228,26 +224,45 @@ public async Task DeleteLeaseStoreAsync() public async Task GetLeaseAsync(string partitionId) // throws URISyntaxException, IOException, StorageException { - AzureBlobLease retval = null; - CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId); - if (await leaseBlob.ExistsAsync(null, this.operationContext).ConfigureAwait(false)) - { - retval = await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false); - } + await leaseBlob.FetchAttributesAsync().ConfigureAwait(false); - return retval; + return await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false); } - public IEnumerable> GetAllLeases() + public async Task> GetAllLeasesAsync() { - IEnumerable partitionIds = this.host.PartitionManager.GetPartitionIdsAsync().WaitAndUnwrapException(); + var leaseList = new List(); + BlobContinuationToken continuationToken = null; - foreach (string id in partitionIds) + do { - yield return GetLeaseAsync(id); - } + var leaseBlobsResult = await this.consumerGroupDirectory.ListBlobsSegmentedAsync( + true, + BlobListingDetails.Metadata, + null, + continuationToken, + null, + this.operationContext); + + foreach (CloudBlockBlob leaseBlob in leaseBlobsResult.Results) + { + // Try getting owner name from existing blob. + // This might return null when run on the existing lease after SDK upgrade. + leaseBlob.Metadata.TryGetValue(MetaDataOwnerName, out var owner); + + // Discover partition id from URI path of the blob. + var partitionId = leaseBlob.Uri.AbsolutePath.Split('/').Last(); + + leaseList.Add(new AzureBlobLease(partitionId, owner, leaseBlob)); + } + + continuationToken = leaseBlobsResult.ContinuationToken; + + } while (continuationToken != null); + + return leaseList; } public async Task CreateLeaseIfNotExistsAsync(string partitionId) // throws URISyntaxException, IOException, StorageException @@ -315,6 +330,7 @@ async Task AcquireLeaseCoreAsync(AzureBlobLease lease) string partitionId = lease.PartitionId; try { + bool renewLease = false; string newToken; await leaseBlob.FetchAttributesAsync(null, null, this.operationContext).ConfigureAwait(false); if (leaseBlob.Properties.LeaseState == LeaseState.Leased) @@ -332,6 +348,7 @@ async Task AcquireLeaseCoreAsync(AzureBlobLease lease) } ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to ChangeLease"); + renewLease = true; newToken = await leaseBlob.ChangeLeaseAsync( newLeaseId, AccessCondition.GenerateLeaseCondition(lease.Token), @@ -341,29 +358,38 @@ async Task AcquireLeaseCoreAsync(AzureBlobLease lease) else { ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to AcquireLease"); - - try - { - newToken = await leaseBlob.AcquireLeaseAsync(leaseDuration, newLeaseId, null, null, this.operationContext).ConfigureAwait(false); - } - catch (StorageException se) - when (se.RequestInformation != null - && se.RequestInformation.ErrorCode.Equals(BlobErrorCodeStrings.LeaseAlreadyPresent, StringComparison.OrdinalIgnoreCase)) - { - // Either some other host grabbed the lease or checkpoint call renewed it. - return false; - } + newToken = await leaseBlob.AcquireLeaseAsync( + leaseDuration, + newLeaseId, + null, + null, + this.operationContext).ConfigureAwait(false); } lease.Token = newToken; lease.Owner = this.host.HostName; lease.IncrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host + + // Renew lease here if needed? + // ChangeLease doesn't renew so we should avoid lease expiring before next renew interval. + if (renewLease) + { + await this.RenewLeaseCoreAsync(lease).ConfigureAwait(false); + } + await leaseBlob.UploadTextAsync( JsonConvert.SerializeObject(lease), null, AccessCondition.GenerateLeaseCondition(lease.Token), null, this.operationContext).ConfigureAwait(false); + + // Update owner in the metadata. + lease.Blob.Metadata[MetaDataOwnerName] = lease.Owner; + await lease.Blob.SetMetadataAsync( + AccessCondition.GenerateLeaseCondition(lease.Token), + null, + this.operationContext).ConfigureAwait(false); } catch (StorageException se) { @@ -418,6 +444,14 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) Token = string.Empty, Owner = string.Empty }; + + // Remove owner in the metadata. + leaseBlob.Metadata.Remove(MetaDataOwnerName); + await leaseBlob.SetMetadataAsync( + AccessCondition.GenerateLeaseCondition(leaseId), + null, + this.operationContext); + await leaseBlob.UploadTextAsync( JsonConvert.SerializeObject(releasedCopy), null, @@ -478,7 +512,7 @@ await leaseBlob.UploadTextAsync( return true; } - async Task DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException + async Task DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException { string jsonLease = await blob.DownloadTextAsync().ConfigureAwait(false); @@ -513,15 +547,7 @@ Exception HandleStorageException(string partitionId, StorageException se) CloudBlockBlob GetBlockBlobReference(string partitionId) { - CloudBlockBlob leaseBlob = this.consumerGroupDirectory.GetBlockBlobReference(partitionId); - - // Fixed, keeping workaround commented until full validation. - // GetBlockBlobReference creates a new ServiceClient thus resets options. - // Because of this we lose settings like MaximumExecutionTime on the client. - // Until storage addresses the issue we need to override it here once more. - // Tracking bug: https://github.com/Azure/azure-storage-net/issues/398 - // leaseBlob.ServiceClient.DefaultRequestOptions = this.storageClient.DefaultRequestOptions; - return leaseBlob; + return this.consumerGroupDirectory.GetBlockBlobReference(partitionId); } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs index f74dc8c..fec94d5 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs @@ -35,6 +35,13 @@ protected override async Task OnOpenAsync() lastException = e; ProcessorEventSource.Log.PartitionPumpWarning( this.Host.HostName, this.PartitionContext.PartitionId, "Failure creating client or receiver, retrying", e.ToString()); + + // Don't retry if we already lost the lease. + if (e is ReceiverDisconnectedException) + { + break; + } + retryCount++; } } diff --git a/src/Microsoft.Azure.EventHubs.Processor/EventProcessorHostActionStrings.cs b/src/Microsoft.Azure.EventHubs.Processor/EventProcessorHostActionStrings.cs index a070254..d974996 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/EventProcessorHostActionStrings.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/EventProcessorHostActionStrings.cs @@ -5,8 +5,10 @@ namespace Microsoft.Azure.EventHubs.Processor { internal static class EventProcessorHostActionStrings { + internal static readonly string DownloadingLeases = "Downloading Leases"; internal static readonly string CheckingLeases = "Checking Leases"; internal static readonly string RenewingLease = "Renewing Lease"; + internal static readonly string ReleasingLease = "Releasing Lease"; internal static readonly string StealingLease = "Stealing Lease"; internal static readonly string CreatingLease = "Creating Lease"; internal static readonly string ClosingEventProcessor = "Closing Event Processor"; diff --git a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs index b606950..d2372f4 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs @@ -38,13 +38,6 @@ public interface ICheckpointManager /// Checkpoint info for the given partition, or null if none has been previously stored. Task GetCheckpointAsync(string partitionId); - /// - /// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint. - /// - /// offset/sequeceNumber to update the store with. - [System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] - Task UpdateCheckpointAsync(Checkpoint checkpoint); - /// /// Create the checkpoint for the given partition if it doesn't exist. Do nothing if it does exist. /// The offset/sequenceNumber for a freshly-created checkpoint should be set to StartOfStream/0. diff --git a/src/Microsoft.Azure.EventHubs.Processor/ILeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ILeaseManager.cs index da98948..a6a207e 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/ILeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/ILeaseManager.cs @@ -65,7 +65,7 @@ public interface ILeaseManager /// A typical implementation could just call GetLeaseAsync() on all partitions. /// /// list of lease info. - IEnumerable> GetAllLeases(); + Task> GetAllLeasesAsync(); /// /// Create in the store the lease info for the given partition, if it does not exist. Do nothing if it does exist diff --git a/src/Microsoft.Azure.EventHubs.Processor/Microsoft.Azure.EventHubs.Processor.csproj b/src/Microsoft.Azure.EventHubs.Processor/Microsoft.Azure.EventHubs.Processor.csproj index 812fc01..e2c0118 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/Microsoft.Azure.EventHubs.Processor.csproj +++ b/src/Microsoft.Azure.EventHubs.Processor/Microsoft.Azure.EventHubs.Processor.csproj @@ -45,7 +45,7 @@ - + diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs index 2e112e5..dbdf80b 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs @@ -92,6 +92,7 @@ async Task RunAsync() } catch (Exception e) { + // Ideally RunLoop should never throw. ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manager main loop, shutting down", e.ToString()); this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.PartitionManagerMainLoop); } @@ -124,8 +125,8 @@ async Task InitializeStoresAsync() //throws InterruptedException, ExecutionExcep // Now make sure the leases exist foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false)) { - await RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(id), id, "Failure creating lease for partition, retrying", - "Out of retries creating lease for partition", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false); + await RetryAsync(() => leaseManager.CreateLeaseIfNotExistsAsync(id), id, $"Failure creating lease for partition {id}, retrying", + $"Out of retries creating lease for partition {id}", EventProcessorHostActionStrings.CreatingLease, 5).ConfigureAwait(false); } // Make sure the checkpoint store exists @@ -141,8 +142,8 @@ async Task InitializeStoresAsync() //throws InterruptedException, ExecutionExcep // Now make sure the checkpoints exist foreach (string id in await this.GetPartitionIdsAsync().ConfigureAwait(false)) { - await RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(id), id, "Failure creating checkpoint for partition, retrying", - "Out of retries creating checkpoint blob for partition", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false); + await RetryAsync(() => checkpointManager.CreateCheckpointIfNotExistsAsync(id), id, $"Failure creating checkpoint for partition {id}, retrying", + $"Out of retries creating checkpoint for partition {id}", EventProcessorHostActionStrings.CreatingCheckpoint, 5).ConfigureAwait(false); } } @@ -201,141 +202,232 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception loopStopwatch.Restart(); ILeaseManager leaseManager = this.host.LeaseManager; - Dictionary allLeases = new Dictionary(); + var allLeases = new ConcurrentDictionary(); + var leasesOwnedByOthers = new ConcurrentDictionary(); // Inspect all leases. // Acquire any expired leases. // Renew any leases that currently belong to us. - var gettingAllLeases = leaseManager.GetAllLeases(); - var leasesOwnedByOthers = new List(); + IEnumerable downloadedLeases; var renewLeaseTasks = new List(); int ourLeaseCount = 0; - // First thing is first, renew owned leases. - foreach (Task getLeaseTask in gettingAllLeases) - { + try + { try { - var lease = await getLeaseTask.ConfigureAwait(false); - allLeases[lease.PartitionId] = lease; - if (lease.Owner == this.host.HostName) - { - ourLeaseCount++; - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, lease.PartitionId, "Trying to renew lease."); - renewLeaseTasks.Add(leaseManager.RenewLeaseAsync(lease).ContinueWith(renewResult => - { - if (renewResult.IsFaulted || !renewResult.WaitAndUnwrapException()) - { - // Might have failed due to intermittent error or lease-lost. - // Just log here, expired leases will be picked by same or another host anyway. - ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, lease.PartitionId, "Failed to renew lease.", renewResult.Exception?.Message); - this.host.EventProcessorOptions.NotifyOfException( - this.host.HostName, - lease.PartitionId, - renewResult.Exception, - EventProcessorHostActionStrings.RenewingLease); - } - }, cancellationToken)); - } - else - { - leasesOwnedByOthers.Add(lease); - } + downloadedLeases = await leaseManager.GetAllLeasesAsync(); } catch (Exception e) { - ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during checking lease.", e.ToString()); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.CheckingLeases); - } - } + ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception during downloading leases", e.Message); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.DownloadingLeases); - // Wait until we are done with renewing our own leases here. - // In theory, this should never throw, error are logged and notified in the renew tasks. - await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false); - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished."); + // Avoid tight spin if getallleases call keeps failing. + await Task.Delay(1000); - // Check any expired leases that we can grab here. - var checkLeaseTasks = new List(); - foreach (var possibleLease in allLeases.Values.Where(lease => lease.Owner != this.host.HostName)) - { - checkLeaseTasks.Add(Task.Run(async () => + continue; + } + + // First things first, renew owned leases. + foreach (var lease in downloadedLeases) { + var subjectLease = lease; + try { - if (await possibleLease.IsExpired().ConfigureAwait(false)) + allLeases[subjectLease.PartitionId] = subjectLease; + if (subjectLease.Owner == this.host.HostName) { - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Trying to acquire lease."); - if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false)) + ourLeaseCount++; + + // Get lease from partition since we need the token at this point. + if (!this.partitionPumps.TryGetValue(subjectLease.PartitionId, out var capturedPump)) { - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, possibleLease.PartitionId, "Acquired lease."); + continue; } + + var capturedLease = capturedPump.Lease; + + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, capturedLease.PartitionId, "Trying to renew lease."); + renewLeaseTasks.Add(leaseManager.RenewLeaseAsync(capturedLease).ContinueWith(renewResult => + { + if (renewResult.IsFaulted) + { + // Might have failed due to intermittent error or lease-lost. + // Just log here, expired leases will be picked by same or another host anyway. + ProcessorEventSource.Log.PartitionPumpError( + this.host.HostName, + capturedLease.PartitionId, + "Failed to renew lease.", + renewResult.Exception?.Message); + + this.host.EventProcessorOptions.NotifyOfException( + this.host.HostName, + capturedLease.PartitionId, + renewResult.Exception, + EventProcessorHostActionStrings.RenewingLease); + + // Nullify the owner on the lease in case this host lost it. + // This helps to remove pump earlier reducing duplicate receives. + if (renewResult.Exception?.GetBaseException() is LeaseLostException) + { + allLeases[capturedLease.PartitionId].Owner = null; + } + } + }, cancellationToken)); + } + else if (!await subjectLease.IsExpired().ConfigureAwait(false)) + { + leasesOwnedByOthers[subjectLease.PartitionId] = subjectLease; } } catch (Exception e) { - ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, possibleLease.PartitionId, "Failure during acquiring lease", e.ToString()); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, possibleLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases); + ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during checking lease.", e.ToString()); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.CheckingLeases); } - }, cancellationToken)); - } + } - await Task.WhenAll(checkLeaseTasks); + // Wait until we are done with renewing our own leases here. + // In theory, this should never throw, error are logged and notified in the renew tasks. + await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false); + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished."); - // Grab more leases if available and needed for load balancing - if (leasesOwnedByOthers.Count > 0) - { - Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers, ourLeaseCount); - if (stealThisLease != null) + // Check any expired leases that we can grab here. + var checkLeaseTasks = new List(); + foreach (var possibleLease in allLeases.Values.Where(lease => lease.Owner != this.host.HostName)) { - try + var subjectLease = possibleLease; + + checkLeaseTasks.Add(Task.Run(async () => { - ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.HostName, stealThisLease.PartitionId); - if (await leaseManager.AcquireLeaseAsync(stealThisLease).ConfigureAwait(false)) + try { - // Succeeded in stealing lease - ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.HostName, stealThisLease.PartitionId); + if (await subjectLease.IsExpired().ConfigureAwait(false)) + { + // Get fresh content of lease subject to acquire. + var downloadedLease = await leaseManager.GetLeaseAsync(subjectLease.PartitionId).ConfigureAwait(false); + allLeases[subjectLease.PartitionId] = downloadedLease; + + // Check expired once more here incase another host have already leased this since we populated the list. + if (await downloadedLease.IsExpired().ConfigureAwait(false)) + { + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Trying to acquire lease."); + if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false)) + { + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, downloadedLease.PartitionId, "Acquired lease."); + leasesOwnedByOthers.TryRemove(downloadedLease.PartitionId, out var removedLease); + Interlocked.Increment(ref ourLeaseCount); + } + else + { + // Acquisition failed. Make sure we don't leave the lease as owned. + allLeases[subjectLease.PartitionId].Owner = null; + + ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, + "Failed to acquire lease for partition " + downloadedLease.PartitionId, null); + } + } + } } - else + catch (Exception e) { - ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, - "Failed to steal lease for partition " + stealThisLease.PartitionId, null); + ProcessorEventSource.Log.PartitionPumpError(this.host.HostName, subjectLease.PartitionId, "Failure during acquiring lease", e.ToString()); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectLease.PartitionId, e, EventProcessorHostActionStrings.CheckingLeases); + + // Acquisition failed. Make sure we don't leave the lease as owned. + allLeases[subjectLease.PartitionId].Owner = null; } - } - catch (Exception e) - { - ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, - "Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString()); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, - stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease); - } + }, cancellationToken)); } - } - // Update pump with new state of leases. - foreach (string partitionId in allLeases.Keys) - { - try + await Task.WhenAll(checkLeaseTasks).ConfigureAwait(false); + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Expired lease check is finished."); + + // Grab more leases if available and needed for load balancing + if (leasesOwnedByOthers.Count > 0) { - Lease updatedLease = allLeases[partitionId]; - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Lease on partition {updatedLease.PartitionId} owned by {updatedLease.Owner}"); - if (updatedLease.Owner == this.host.HostName) - { - await this.CheckAndAddPumpAsync(partitionId, updatedLease).ConfigureAwait(false); - } - else + Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers.Values, ourLeaseCount); + + // Don't attempt to steal the lease if current host has a pump for this partition id + // This is possible when current pump is in failed state due to lease moved to some other host. + if (stealThisLease != null && !this.partitionPumps.ContainsKey(stealThisLease.PartitionId)) { - await this.RemovePumpAsync(partitionId, CloseReason.LeaseLost).ConfigureAwait(false); + try + { + // Get fresh content of lease subject to acquire. + var downloadedLease = await leaseManager.GetLeaseAsync(stealThisLease.PartitionId).ConfigureAwait(false); + allLeases[stealThisLease.PartitionId] = downloadedLease; + + // Don't attempt to steal if lease is already expired. + // Expired leases are picked up by other hosts quickly. + // Don't attempt to steal if owner has changed from the calculation time to refresh time. + if (!await downloadedLease.IsExpired().ConfigureAwait(false) + && downloadedLease.Owner == stealThisLease.Owner) + { + ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.HostName, downloadedLease.PartitionId); + if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false)) + { + // Succeeded in stealing lease + ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.HostName, downloadedLease.PartitionId); + ourLeaseCount++; + } + else + { + // Acquisition failed. Make sure we don't leave the lease as owned. + allLeases[stealThisLease.PartitionId].Owner = null; + + ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName, + "Failed to steal lease for partition " + downloadedLease.PartitionId, null); + } + } + } + catch (Exception e) + { + ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, + "Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString()); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, + stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease); + + // Acquisition failed. Make sure we don't leave the lease as owned. + allLeases[stealThisLease.PartitionId].Owner = null; + } } } - catch (Exception e) + + // Update pump with new state of leases on owned partitions in parallel. + var createRemovePumpTasks = new List(); + foreach (string partitionId in allLeases.Keys) { - ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception during add/remove pump on partition {partitionId}", e.Message); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, partitionId, e, EventProcessorHostActionStrings.PartitionPumpManagement); + var subjectPartitionId = partitionId; + + createRemovePumpTasks.Add(Task.Run(async () => + { + try + { + Lease updatedLease = allLeases[subjectPartitionId]; + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Lease on partition {updatedLease.PartitionId} owned by {updatedLease.Owner}"); + if (updatedLease.Owner == this.host.HostName) + { + await this.CheckAndAddPumpAsync(subjectPartitionId, updatedLease).ConfigureAwait(false); + } + else + { + await this.TryRemovePumpAsync(subjectPartitionId, CloseReason.LeaseLost).ConfigureAwait(false); + } + } + catch (Exception e) + { + ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception during add/remove pump on partition {subjectPartitionId}", e.Message); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartitionId, e, EventProcessorHostActionStrings.PartitionPumpManagement); + } + }, cancellationToken)); } - } - try - { + await Task.WhenAll(createRemovePumpTasks).ConfigureAwait(false); + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Pump update is finished."); + // Consider reducing the wait time with last lease-walkthrough's time taken. var elapsedTime = loopStopwatch.Elapsed; if (leaseManager.LeaseRenewInterval > elapsedTime) @@ -343,9 +435,17 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception await Task.Delay(leaseManager.LeaseRenewInterval.Subtract(elapsedTime), cancellationToken).ConfigureAwait(false); } } - catch (TaskCanceledException) + catch (Exception e) { - // Bail on the async work if we are canceled. + // TaskCancelledException is expected furing host unregister. + if (e is TaskCanceledException) + { + continue; + } + + // Loop should not exit unless signalled via cancellation token. Log any failures and continue. + ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manager main loop, continuing", e.Message); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.PartitionPumpManagement); } } } @@ -359,31 +459,50 @@ async Task CheckAndAddPumpAsync(string partitionId, Lease lease) if (capturedPump.PumpStatus == PartitionPumpStatus.Errored || capturedPump.IsClosing) { // The existing pump is bad. Remove it. - await RemovePumpAsync(partitionId, CloseReason.Shutdown).ConfigureAwait(false); + await TryRemovePumpAsync(partitionId, CloseReason.Shutdown).ConfigureAwait(false); } else { - // Pump is working, just replace the lease. - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Updating lease for pump"); - capturedPump.SetLease(lease); + // Lease token can show up empty here if lease content download has failed or not recently acquired. + // Don't update the token if so. + if (!string.IsNullOrWhiteSpace(lease.Token)) + { + // Pump is working, just replace the lease token. + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Updating lease token for pump"); + capturedPump.SetLeaseToken(lease.Token); + } + else + { + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Skipping to update lease token for pump"); + } } } else { // No existing pump, create a new one. - await CreateNewPumpAsync(partitionId, lease).ConfigureAwait(false); + await CreateNewPumpAsync(partitionId).ConfigureAwait(false); } } - async Task CreateNewPumpAsync(string partitionId, Lease lease) + async Task CreateNewPumpAsync(string partitionId) { - PartitionPump newPartitionPump = new EventHubPartitionPump(this.host, lease); + // Refresh lease content and do last minute check to reduce partition moves. + var refreshedLease = await this.host.LeaseManager.GetLeaseAsync(partitionId); + if (refreshedLease.Owner != this.host.HostName || await refreshedLease.IsExpired().ConfigureAwait(false)) + { + // Partition moved to some other node after lease acquisition. + // Return w/o creating the pump. + ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, $"Partition moved to another host or expired after acquisition."); + return; + } + + PartitionPump newPartitionPump = new EventHubPartitionPump(this.host, refreshedLease); await newPartitionPump.OpenAsync().ConfigureAwait(false); this.partitionPumps.TryAdd(partitionId, newPartitionPump); // do the put after start, if the start fails then put doesn't happen ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "Created new PartitionPump"); } - async Task RemovePumpAsync(string partitionId, CloseReason reason) + async Task TryRemovePumpAsync(string partitionId, CloseReason reason) { PartitionPump capturedPump; if (this.partitionPumps.TryRemove(partitionId, out capturedPump)) @@ -394,12 +513,6 @@ async Task RemovePumpAsync(string partitionId, CloseReason reason) } // else, pump is already closing/closed, don't need to try to shut it down again } - else - { - // PartitionManager main loop tries to remove pump for every partition that the host does not own, just to be sure. - // Not finding a pump for a partition is normal and expected most of the time. - ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, partitionId, "No pump found to remove for this partition"); - } } Task RemoveAllPumpsAsync(CloseReason reason) @@ -408,15 +521,22 @@ Task RemoveAllPumpsAsync(CloseReason reason) var keys = new List(this.partitionPumps.Keys); foreach (string partitionId in keys) { - tasks.Add(this.RemovePumpAsync(partitionId, reason)); + tasks.Add(this.TryRemovePumpAsync(partitionId, reason)); } return Task.WhenAll(tasks); } - Lease WhichLeaseToSteal(List stealableLeases, int haveLeaseCount) + Lease WhichLeaseToSteal(IEnumerable stealableLeases, int haveLeaseCount) { IDictionary countsByOwner = CountLeasesByOwner(stealableLeases); + + // Consider all leases might be already released where we won't have any entry in the return counts map. + if (countsByOwner.Count == 0) + { + return null; + } + var biggestOwner = countsByOwner.OrderByDescending(o => o.Value).First(); Lease stealThisLease = null; @@ -450,7 +570,7 @@ Lease WhichLeaseToSteal(List stealableLeases, int haveLeaseCount) Dictionary CountLeasesByOwner(IEnumerable leases) { - var counts = leases.GroupBy(lease => lease.Owner).Select(group => new + var counts = leases.Where(lease => lease.Owner != null).GroupBy(lease => lease.Owner).Select(group => new { Owner = group.Key, Count = group.Count() diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs index d39e85d..85c66ae 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs @@ -23,7 +23,7 @@ protected PartitionPump(EventProcessorHost host, Lease lease) protected EventProcessorHost Host { get; } - protected Lease Lease { get; } + protected internal Lease Lease { get; } protected IEventProcessor Processor { get; private set; } @@ -31,9 +31,9 @@ protected PartitionPump(EventProcessorHost host, Lease lease) protected AsyncLock ProcessingAsyncLock { get; } - internal void SetLease(Lease newLease) + internal void SetLeaseToken(string newToken) { - this.PartitionContext.Lease = newLease; + this.PartitionContext.Lease.Token = newToken; } public async Task OpenAsync() @@ -124,12 +124,15 @@ public async Task CloseAsync(CloseReason reason) if (reason != CloseReason.LeaseLost) { // Since this pump is dead, release the lease. - // Ignore LeaseLostException try { await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); } - catch (LeaseLostException) { } + catch (Exception e) + { + // Log and ignore any failure since expired lease will be picked by another host. + this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease); + } } this.PumpStatus = PartitionPumpStatus.Closed; diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs index 167fd62..bade778 100644 --- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs +++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs @@ -312,6 +312,12 @@ async Task ReceivePumpAsync(CancellationToken cancellationToken, bool invokeWhen } catch { } + // ReceiverDisconnectedException is a special case where we know we cannot recover the pump. + if (e is ReceiverDisconnectedException) + { + break; + } + continue; } diff --git a/src/Microsoft.Azure.EventHubs/Microsoft.Azure.EventHubs.csproj b/src/Microsoft.Azure.EventHubs/Microsoft.Azure.EventHubs.csproj index cc87116..972fa65 100644 --- a/src/Microsoft.Azure.EventHubs/Microsoft.Azure.EventHubs.csproj +++ b/src/Microsoft.Azure.EventHubs/Microsoft.Azure.EventHubs.csproj @@ -68,7 +68,7 @@ - + diff --git a/test/Microsoft.Azure.EventHubs.Tests/Client/DataBatchTests.cs b/test/Microsoft.Azure.EventHubs.Tests/Client/DataBatchTests.cs index 6cb1df2..072e88f 100644 --- a/test/Microsoft.Azure.EventHubs.Tests/Client/DataBatchTests.cs +++ b/test/Microsoft.Azure.EventHubs.Tests/Client/DataBatchTests.cs @@ -137,7 +137,7 @@ protected async Task SendWithEventDataBatch( // We will send a thousand messages where each message is 1K. var totalSent = 0; var rnd = new Random(); - TestUtility.Log($"Starting to send."); + TestUtility.Log("Starting to send."); do { // Send random body size. diff --git a/test/Microsoft.Azure.EventHubs.Tests/Client/NegativeCases.cs b/test/Microsoft.Azure.EventHubs.Tests/Client/NegativeCases.cs index 17be408..9b538ed 100644 --- a/test/Microsoft.Azure.EventHubs.Tests/Client/NegativeCases.cs +++ b/test/Microsoft.Azure.EventHubs.Tests/Client/NegativeCases.cs @@ -150,7 +150,7 @@ async Task GetPartitionRuntimeInformationFromInvalidPartition() foreach (var invalidPartitionId in invalidPartitions) { - await Assert.ThrowsAsync(async () => + await Assert.ThrowsAsync(async () => { TestUtility.Log($"Getting partition information from invalid partition {invalidPartitionId}"); await this.EventHubClient.GetPartitionRuntimeInformationAsync(invalidPartitionId); diff --git a/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs b/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs index 29bf0a8..1db6653 100644 --- a/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs +++ b/test/Microsoft.Azure.EventHubs.Tests/Processor/ProcessorTestBase.cs @@ -152,7 +152,9 @@ async Task MultipleProcessorHosts() // Prepare host trackers. var hostReceiveEvents = new ConcurrentDictionary(); + var containerName = Guid.NewGuid().ToString(); var hosts = new List(); + try { for (int hostId = 0; hostId < hostCount; hostId++) @@ -163,11 +165,11 @@ async Task MultipleProcessorHosts() TestUtility.Log("Creating EventProcessorHost"); var eventProcessorHost = new EventProcessorHost( thisHostName, - string.Empty, // Passing empty as entity path here rsince path is already in EH connection string. + string.Empty, // Passing empty as entity path here since path is already in EH connection string. PartitionReceiver.DefaultConsumerGroupName, TestUtility.EventHubsConnectionString, TestUtility.StorageConnectionString, - Guid.NewGuid().ToString()); + containerName); hosts.Add(eventProcessorHost); TestUtility.Log($"Calling RegisterEventProcessorAsync"); var processorOptions = new EventProcessorOptions @@ -673,7 +675,9 @@ async Task HostShouldRecoverAfterReceiverDisconnection() }; }; - await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory); + var epo = EventProcessorOptions.DefaultOptions; + epo.ReceiveTimeout = TimeSpan.FromSeconds(10); + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); // Wait 15 seconds then create a new epoch receiver. // This will trigger ReceiverDisconnectedExcetion in the host. @@ -685,7 +689,7 @@ async Task HostShouldRecoverAfterReceiverDisconnection() targetPartition, EventPosition.FromStart(), 2); await externalReceiver.ReceiveAsync(100, TimeSpan.FromSeconds(5)); - // Give another 1 minute for host to recover then do the validatins. + // Give another 1 minute for host to recover then do the validations. await Task.Delay(60000); TestUtility.Log("Verifying that host was able to receive ReceiverDisconnectedException"); diff --git a/test/Microsoft.Azure.EventHubs.Tests/TestUtility.cs b/test/Microsoft.Azure.EventHubs.Tests/TestUtility.cs index 7db0954..4db6d06 100644 --- a/test/Microsoft.Azure.EventHubs.Tests/TestUtility.cs +++ b/test/Microsoft.Azure.EventHubs.Tests/TestUtility.cs @@ -38,6 +38,7 @@ static TestUtility() // Update operation timeout on ConnectionStringBuilder. ehCsb.OperationTimeout = TimeSpan.FromSeconds(30); + EventHubsConnectionString = ehCsb.ToString(); } @@ -62,7 +63,7 @@ internal static Task SendToPartitionAsync(EventHubClient ehClient, string partit internal static async Task SendToPartitionAsync(EventHubClient ehClient, string partitionId, EventData eventData, int numberOfMessages = 1) { - TestUtility.Log($"Starting to send {numberOfMessages} to partition {partitionId}."); + TestUtility.Log($"Starting to send {numberOfMessages} messages to partition {partitionId}."); var partitionSender = ehClient.CreatePartitionSender(partitionId); for (int i = 0; i < numberOfMessages; i++)