From 4062f5b1c0d68185a71f96e764444e0fbb15e0bb Mon Sep 17 00:00:00 2001 From: serkar Date: Wed, 25 Jan 2017 11:11:06 -0800 Subject: [PATCH 01/12] Fixing build break introduced with preview check-in. --- src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs index a04e10e..9b2b184 100644 --- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs +++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpEventDataSender.cs @@ -49,7 +49,7 @@ protected override async Task OnSendAsync(IEnumerable eventDatas, str do { - using (AmqpMessage amqpMessage = AmqpMessageConverter.EventDatasToAmqpMessage(eventDatas, partitionKey, true)) + using (AmqpMessage amqpMessage = AmqpMessageConverter.EventDatasToAmqpMessage(eventDatas, partitionKey)) { shouldRetry = false; From 317720d3b31ef12cddabf49773b2232a3eb62e81 Mon Sep 17 00:00:00 2001 From: serkar Date: Tue, 7 Feb 2017 09:37:50 -0800 Subject: [PATCH 02/12] Here is the list of the changes: + Moving Offset and SequenceNumbre to base class Lease.cs + AzureStorageCheckpointLeaseManager.cs to stop downloading lease at every checkpoint call. This was causing 2 hosts operating on the same lease for a while. + Set MaximumExecutionTime on renewRequestOptions used while renewing storage lease. + Throw LeaseLostException when lease is lost on operations like Renew and Update. + Updating interface ICheckpointManager to accept Lease while updating checkpoint. + Some Linq improvements at lease steal and balancing logics. + Updating multiple-hosts unit test to validate load balancing. --- .../AzureBlobLease.cs | 6 - .../AzureStorageCheckpointLeaseManager.cs | 53 ++++---- .../ICheckpointManager.cs | 3 +- .../Lease.cs | 7 +- .../LeaseLostException.cs | 1 - .../PartitionContext.cs | 10 +- .../PartitionManager.cs | 120 +++++++----------- .../PartitionPump.cs | 7 +- .../EventProcessorHostTests.cs | 39 ++++-- 9 files changed, 120 insertions(+), 126 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs index 769d04f..f1c54cc 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs @@ -10,7 +10,6 @@ namespace Microsoft.Azure.EventHubs.Processor class AzureBlobLease : Lease { // ctor needed for deserialization - internal AzureBlobLease() { } @@ -36,14 +35,9 @@ internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(sourc } // do not serialize - [JsonIgnore] public CloudBlockBlob Blob { get; } - public string Offset { get; set; } - - public long SequenceNumber { get; set; } - public override async Task IsExpired() { await this.Blob.FetchAttributesAsync().ConfigureAwait(false); // Get the latest metadata diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs index 6352ac9..2025a74 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs @@ -28,7 +28,13 @@ sealed class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseMana static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2); static readonly TimeSpan leaseDuration = TimeSpan.FromSeconds(30); static readonly TimeSpan leaseRenewInterval = TimeSpan.FromSeconds(10); - readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions(); + + // Lease renew calls shouldn't wait more than leaseRenewInterval + readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions() + { + ServerTimeout = leaseRenewInterval, + MaximumExecutionTime = TimeSpan.FromMinutes(1) + }; internal AzureStorageCheckpointLeaseManager(string storageConnectionString, string leaseContainerName, string storageBlobPrefix) { @@ -106,13 +112,12 @@ public async Task CreateCheckpointIfNotExistsAsync(string partitionI return checkpoint; } - public async Task UpdateCheckpointAsync(Checkpoint checkpoint) + public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) { - // Need to fetch the most current lease data so that we can update it correctly. - AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(checkpoint.PartitionId).ConfigureAwait(false); - lease.Offset = checkpoint.Offset; - lease.SequenceNumber = checkpoint.SequenceNumber; - await UpdateLeaseAsync(lease).ConfigureAwait(false); + AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease)lease); + newLease.Offset = checkpoint.Offset; + newLease.SequenceNumber = checkpoint.SequenceNumber; + await UpdateLeaseAsync(newLease).ConfigureAwait(false); } public Task DeleteCheckpointAsync(string partitionId) @@ -331,8 +336,8 @@ public Task RenewLeaseAsync(Lease lease) async Task RenewLeaseCoreAsync(AzureBlobLease lease) { CloudBlockBlob leaseBlob = lease.Blob; - bool retval = true; string partitionId = lease.PartitionId; + try { await leaseBlob.RenewLeaseAsync(AccessCondition.GenerateLeaseCondition(lease.Token), this.renewRequestOptions, null).ConfigureAwait(false); @@ -341,15 +346,13 @@ async Task RenewLeaseCoreAsync(AzureBlobLease lease) { if (WasLeaseLost(partitionId, se)) { - retval = false; - } - else - { - throw; + throw new LeaseLostException(lease, se); } + + throw; } - return retval; + return true; } public Task ReleaseLeaseAsync(Lease lease) @@ -362,8 +365,8 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, lease.PartitionId, "Releasing lease"); CloudBlockBlob leaseBlob = lease.Blob; - bool retval = true; string partitionId = lease.PartitionId; + try { string leaseId = lease.Token; @@ -379,15 +382,13 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) { if (WasLeaseLost(partitionId, se)) { - retval = false; - } - else - { - throw; + throw new LeaseLostException(lease, se); } + + throw; } - return retval; + return true; } public Task UpdateLeaseAsync(Lease lease) @@ -410,12 +411,9 @@ async Task UpdateLeaseCoreAsync(AzureBlobLease lease) { return false; } - - // First, renew the lease to make sure the update will go through. - if (!await this.RenewLeaseAsync(lease).ConfigureAwait(false)) - { - return false; - } + + // First, renew the lease to make sure the update will go through. + await this.RenewLeaseAsync(lease).ConfigureAwait(false); CloudBlockBlob leaseBlob = lease.Blob; try @@ -469,6 +467,7 @@ bool WasLeaseLost(string partitionId, StorageException se) } } } + return retval; } diff --git a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs index 6ec6da2..d2372f4 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs @@ -49,8 +49,9 @@ public interface ICheckpointManager /// /// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint. /// + /// Partition information against which to perform a checkpoint. /// offset/sequeceNumber to update the store with. - Task UpdateCheckpointAsync(Checkpoint checkpoint); + Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint); /// /// Delete the stored checkpoint for the given partition. If there is no stored checkpoint for the diff --git a/src/Microsoft.Azure.EventHubs.Processor/Lease.cs b/src/Microsoft.Azure.EventHubs.Processor/Lease.cs index 23905f4..877dd63 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/Lease.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/Lease.cs @@ -26,6 +26,10 @@ protected Lease(Lease source) this.Token = source.Token; } + public string Offset { get; set; } + + public long SequenceNumber { get; set; } + public string PartitionId { get; set; } public string Owner { get; set; } @@ -36,7 +40,8 @@ protected Lease(Lease source) public virtual Task IsExpired() { - // this function is meaningless in the base class + // By default lease never expires. + // Deriving class will implement the lease expiry logic. return Task.FromResult(false); } diff --git a/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs b/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs index 1e86902..1738a78 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/LeaseLostException.cs @@ -20,7 +20,6 @@ internal LeaseLostException(Lease lease, Exception innerException) this.lease = lease; } - // We don't want to expose Lease to the public. public string PartitionId { get { return this.lease.PartitionId; } diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs index 27222d2..2f958ac 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs @@ -175,12 +175,14 @@ async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOf { if (inStoreCheckpoint == null) { - inStoreCheckpoint = await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false); + await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false); } - inStoreCheckpoint.Offset = checkpoint.Offset; - inStoreCheckpoint.SequenceNumber = checkpoint.SequenceNumber; - await this.host.CheckpointManager.UpdateCheckpointAsync(inStoreCheckpoint).ConfigureAwait(false); + await this.host.CheckpointManager.UpdateCheckpointAsync(this.Lease, checkpoint).ContinueWith((obj) => + { + this.Lease.Offset = checkpoint.Offset; + this.Lease.SequenceNumber = checkpoint.SequenceNumber; + }).ConfigureAwait(false); } else { diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs index 8edada2..9bb3e17 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs @@ -192,19 +192,19 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception // Renew any leases that currently belong to us. IEnumerable> gettingAllLeases = leaseManager.GetAllLeases(); List leasesOwnedByOthers = new List(); - int ourLeasesCount = 0; - foreach (Task getLeastTask in gettingAllLeases) + int ourLeaseCount = 0; + foreach (Task getLeaseTask in gettingAllLeases) { try { - Lease possibleLease = await getLeastTask.ConfigureAwait(false); + Lease possibleLease = await getLeaseTask.ConfigureAwait(false); allLeases[possibleLease.PartitionId] = possibleLease; if (await possibleLease.IsExpired().ConfigureAwait(false)) { ProcessorEventSource.Log.PartitionPumpInfo(this.host.Id, possibleLease.PartitionId, "Trying to acquire lease."); if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false)) { - ourLeasesCount++; + ourLeaseCount++; } else { @@ -215,11 +215,15 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception else if (possibleLease.Owner == this.host.HostName) { ProcessorEventSource.Log.PartitionPumpInfo(this.host.Id, possibleLease.PartitionId, "Trying to renew lease."); - if (await leaseManager.RenewLeaseAsync(possibleLease).ConfigureAwait(false)) + + // Try to renew the lease. If successful then this lease belongs to us, + // if throws LeaseLostException then we don't own it anymore. + try { - ourLeasesCount++; + await leaseManager.RenewLeaseAsync(possibleLease).ConfigureAwait(false); + ourLeaseCount++; } - else + catch (LeaseLostException) { // Probably failed because another host stole it between get and renew leasesOwnedByOthers.Add(possibleLease); @@ -240,31 +244,30 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception // Grab more leases if available and needed for load balancing if (leasesOwnedByOthers.Count > 0) { - IEnumerable stealTheseLeases = WhichLeasesToSteal(leasesOwnedByOthers, ourLeasesCount); - if (stealTheseLeases != null) + Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers, ourLeaseCount); + if (stealThisLease != null) { - foreach (Lease stealee in stealTheseLeases) + try { - try + ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.Id, stealThisLease.PartitionId); + if (await leaseManager.AcquireLeaseAsync(stealThisLease).ConfigureAwait(false)) { - ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.Id, stealee.PartitionId); - if (await leaseManager.AcquireLeaseAsync(stealee).ConfigureAwait(false)) - { - // Succeeded in stealing lease - ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.Id, stealee.PartitionId); - ourLeasesCount++; - } - else - { - ProcessorEventSource.Log.EventProcessorHostWarning(this.host.Id, "Failed to steal lease for partition " + stealee.PartitionId, null); - } + // Succeeded in stealing lease + ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.Id, stealThisLease.PartitionId); } - catch (Exception e) + else { - ProcessorEventSource.Log.EventProcessorHostError(this.host.Id, "Exception during stealing lease for partition " + stealee.PartitionId, e.ToString()); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, stealee.PartitionId, e, EventProcessorHostActionStrings.StealingLease); + ProcessorEventSource.Log.EventProcessorHostWarning(this.host.Id, + "Failed to steal lease for partition " + stealThisLease.PartitionId, null); } } + catch (Exception e) + { + ProcessorEventSource.Log.EventProcessorHostError(this.host.Id, + "Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString()); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, + stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease); + } } } @@ -366,12 +369,11 @@ Task RemoveAllPumpsAsync(CloseReason reason) return Task.WhenAll(tasks); } - IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLeaseCount) + Lease WhichLeaseToSteal(List stealableLeases, int haveLeaseCount) { IDictionary countsByOwner = CountLeasesByOwner(stealableLeases); - string biggestOwner = FindBiggestOwner(countsByOwner); - int biggestCount = countsByOwner[biggestOwner]; - List stealTheseLeases = null; + var biggestOwner = countsByOwner.OrderByDescending(o => o.Value).First(); + Lease stealThisLease = null; // If the number of leases is a multiple of the number of hosts, then the desired configuration is // that all hosts own the name number of leases, and the difference between the "biggest" owner and @@ -384,7 +386,7 @@ IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLease // // In either case, if the difference between this host and the biggest owner is 2 or more, then the // system is not in the most evenly-distributed configuration, so steal one lease from the biggest. - // If there is a tie for biggest, findBiggestOwner() picks whichever appears first in the list because + // If there is a tie for biggest, we pick whichever appears first in the list because // it doesn't really matter which "biggest" is trimmed down. // // Stealing one at a time prevents flapping because it reduces the difference between the biggest and @@ -392,61 +394,31 @@ IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLease // end up below 0. This host may become tied for biggest, but it cannot become larger than the host that // it is stealing from. - if ((biggestCount - haveLeaseCount) >= 2) + if ((biggestOwner.Value - haveLeaseCount) >= 2) { - stealTheseLeases = new List(); - foreach (Lease l in stealableLeases) - { - if (l.Owner == biggestOwner) - { - stealTheseLeases.Add(l); - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Proposed to steal lease for partition {l.PartitionId} from {biggestOwner}"); - break; - } - } + stealThisLease = stealableLeases.Where(l => l.Owner == biggestOwner.Key).First(); + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Proposed to steal lease for partition {stealThisLease.PartitionId} from {biggestOwner.Key}"); } - return stealTheseLeases; + return stealThisLease; } - string FindBiggestOwner(IDictionary countsByOwner) + Dictionary CountLeasesByOwner(IEnumerable leases) { - int biggestCount = 0; - string biggestOwner = null; - foreach (string owner in countsByOwner.Keys) - { - if (countsByOwner[owner] > biggestCount) - { - biggestCount = countsByOwner[owner]; - biggestOwner = owner; - } - } - return biggestOwner; - } + var counts = leases.GroupBy(lease => lease.Owner).Select(group => new { + Owner = group.Key, + Count = group.Count() + }); - IDictionary CountLeasesByOwner(IEnumerable leases) - { - IDictionary counts = new Dictionary(); - foreach (Lease l in leases) + // Log ownership mapping. + foreach (var owner in counts) { - if (counts.ContainsKey(l.Owner)) - { - int oldCount = counts[l.Owner]; - counts[l.Owner] = oldCount + 1; - } - else - { - counts[l.Owner] = 1; - } + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Host {owner.Owner} owns {owner.Count} leases"); } - foreach (string owner in counts.Keys) - { - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Host {owner} owns {counts[owner]} leases"); - } + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Total hosts in list: {counts.Count()}"); - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Total hosts in list: {counts.Count}"); - return counts; + return counts.ToDictionary(e => e.Owner, e => e.Count); } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs index d8d7140..0430ae5 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs @@ -112,7 +112,12 @@ public async Task CloseAsync(CloseReason reason) if (reason != CloseReason.LeaseLost) { // Since this pump is dead, release the lease. - await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); + // Ignore LeaseLostException + try + { + await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); + } + catch (LeaseLostException) { } } this.PumpStatus = PartitionPumpStatus.Closed; diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs index f14cfc0..8692c6f 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs @@ -152,22 +152,31 @@ Task SingleProcessorHost() [Fact] async Task MultipleProcessorHosts() { - Log("Testing with 2 EventProcessorHost instances"); + int hostCount = 3; + Log($"Testing with {hostCount} EventProcessorHost instances"); + + // Prepare partition trackers. var partitionReceiveEvents = new ConcurrentDictionary(); foreach (var partitionId in PartitionIds) { partitionReceiveEvents[partitionId] = new AsyncAutoResetEvent(false); } - int hostCount = 2; + // Prepare host trackers. + var hostReceiveEvents = new ConcurrentDictionary(); + var hosts = new List(); try { - for (int i = 0; i < hostCount; i++) + for (int hostId = 0; hostId < hostCount; hostId++) { + var thisHostName = $"host-{hostId}"; + hostReceiveEvents[thisHostName] = new AsyncAutoResetEvent(false); + Log("Creating EventProcessorHost"); var eventProcessorHost = new EventProcessorHost( + thisHostName, string.Empty, // Passing empty as entity path here rsince path is already in EH connection string. PartitionReceiver.DefaultConsumerGroupName, this.EventHubConnectionString, @@ -194,11 +203,11 @@ async Task MultipleProcessorHosts() processor.OnProcessEvents += (_, eventsArgs) => { int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; - Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); if (eventCount > 0) { - var receivedEvent = partitionReceiveEvents[partitionId]; - receivedEvent.Set(); + Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + partitionReceiveEvents[partitionId].Set(); + hostReceiveEvents[hostName].Set(); } }; }; @@ -206,8 +215,10 @@ async Task MultipleProcessorHosts() await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); } + // Allow some time for each host to own at least 1 partition. + // Partition stealing logic balances partition ownership one at a time. Log("Waiting for partition ownership to settle..."); - await Task.Delay(TimeSpan.FromSeconds(30)); + await Task.Delay(TimeSpan.FromSeconds(60)); Log("Sending an event to each partition"); var sendTasks = new List(); @@ -218,11 +229,17 @@ async Task MultipleProcessorHosts() await Task.WhenAll(sendTasks); Log("Verifying an event was received by each partition"); - foreach (var partitionId in PartitionIds) + foreach (var e in partitionReceiveEvents) + { + bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.True(ret, $"Partition {e.Key} didn't receive any message!"); + } + + Log("Verifying at least an event was received by each host"); + foreach (var e in hostReceiveEvents) { - var receivedEvent = partitionReceiveEvents[partitionId]; - bool partitionReceivedMessage = await receivedEvent.WaitAsync(TimeSpan.FromSeconds(30)); - Assert.True(partitionReceivedMessage, $"Partition {partitionId} didn't receive any message!"); + bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.True(ret, $"Host {e.Key} didn't receive any message!"); } } finally From 20068bdb76cbb015a3128837e4150845fc80935f Mon Sep 17 00:00:00 2001 From: serkar Date: Wed, 8 Feb 2017 14:39:01 -0800 Subject: [PATCH 03/12] Set PartitionContext's SequenceNumber and Offset only before delivering the messages to the handler. --- .../PartitionPump.cs | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs index 0430ae5..f06adbd 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs @@ -140,15 +140,18 @@ protected async Task ProcessEventsAsync(IEnumerable events) // protected by synchronizing too. using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false)) { - int eventCount = events?.Count() ?? 0; - ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.Id, this.PartitionContext.PartitionId, eventCount); + ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.Id, + this.PartitionContext.PartitionId, events?.Count() ?? 0); try { - if (eventCount > 0) + EventData last = events?.LastOrDefault(); + if (last != null) { - var lastMessage = events.Last(); - this.PartitionContext.SequenceNumber = lastMessage.SystemProperties.SequenceNumber; - this.PartitionContext.Offset = lastMessage.SystemProperties.Offset; + ProcessorEventSource.Log.PartitionPumpInfo( + this.Host.Id, + this.PartitionContext.PartitionId, + "Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber); + this.PartitionContext.SetOffsetAndSequenceNumber(last); } await this.Processor.ProcessEventsAsync(this.PartitionContext, events).ConfigureAwait(false); @@ -162,16 +165,6 @@ protected async Task ProcessEventsAsync(IEnumerable events) { ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStop(this.Host.Id, this.PartitionContext.PartitionId); } - - EventData last = events?.LastOrDefault(); - if (last != null) - { - ProcessorEventSource.Log.PartitionPumpInfo( - this.Host.Id, - this.PartitionContext.PartitionId, - "Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber); - this.PartitionContext.SetOffsetAndSequenceNumber(last); - } } } From aa2e86027c2c13409862e5c4bf0ba8faa2384da7 Mon Sep 17 00:00:00 2001 From: serkar Date: Wed, 8 Feb 2017 14:40:21 -0800 Subject: [PATCH 04/12] CheckpointAsync(EventData eventData) should not allow checkpointing if the suplied eventData is ahead of processed batch. --- .../PartitionContext.cs | 54 ++++++------------- 1 file changed, 16 insertions(+), 38 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs index 2f958ac..910ff90 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs @@ -44,15 +44,6 @@ public string Owner object ThisLock { get; } - /// - /// Updates the offset/sequenceNumber in the PartitionContext with the values in the received EventData object. - /// - /// Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be - /// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber - /// is equal or greater, the new offset will be as well. - /// - /// A received EventData with valid offset and sequenceNumber - /// If the sequenceNumber in the provided event is less than the current value internal void SetOffsetAndSequenceNumber(EventData eventData) { if (eventData == null) @@ -60,35 +51,12 @@ internal void SetOffsetAndSequenceNumber(EventData eventData) throw new ArgumentNullException(nameof(eventData)); } - this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber); - } - - /// - /// Updates the offset/sequenceNumber in the PartitionContext. - /// - /// These two values are closely tied and must be updated in an atomic fashion, hence the combined setter. - /// Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be - /// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber - /// is equal or greater, the new offset will be as well. - /// - /// New offset value - /// New sequenceNumber value - /// If the sequenceNumber in the provided event is less than the current value - void SetOffsetAndSequenceNumber(string offset, long sequenceNumber) - { - lock(this.ThisLock) + lock (this.ThisLock) { - if (sequenceNumber >= this.SequenceNumber) - { - this.Offset = offset; - this.SequenceNumber = sequenceNumber; - } - else - { - throw new ArgumentOutOfRangeException("offset/sequenceNumber", $"New offset {offset}/{sequenceNumber} is less than previous {this.Offset}/{this.SequenceNumber}"); - } + this.Offset = eventData.SystemProperties.Offset; + this.SequenceNumber = eventData.SystemProperties.SequenceNumber; } - } + } internal async Task GetInitialOffsetAsync() // throws InterruptedException, ExecutionException { @@ -132,7 +100,6 @@ internal async Task GetInitialOffsetAsync() // throws InterruptedExcepti /// /// Writes the current offset and sequenceNumber to the checkpoint store via the checkpoint manager. /// - /// If this.sequenceNumber is less than the last checkpointed value public Task CheckpointAsync() { // Capture the current offset and sequenceNumber. Synchronize to be sure we get a matched pair @@ -153,10 +120,21 @@ public Task CheckpointAsync() /// values to the checkpoint store via the checkpoint manager. /// /// A received EventData with valid offset and sequenceNumber + /// If suplied eventData is null /// If the sequenceNumber is less than the last checkpointed value public Task CheckpointAsync(EventData eventData) { - this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber); + if (eventData == null) + { + throw new ArgumentNullException("eventData"); + } + + // We have never seen this sequence number yet + if (eventData.SystemProperties.SequenceNumber > this.SequenceNumber) + { + throw new ArgumentOutOfRangeException("eventData.SystemProperties.SequenceNumber"); + } + return this.PersistCheckpointAsync(new Checkpoint(this.PartitionId, eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber)); } From 82b795717f0bbb6d95744eb662828577b8beb4e5 Mon Sep 17 00:00:00 2001 From: serkar Date: Wed, 8 Feb 2017 17:52:09 -0800 Subject: [PATCH 05/12] Add unit test to cover checkpointing messages in the middle of a batch. --- .../EventProcessorHostTests.cs | 135 ++++++++++++------ .../TestEventProcessor.cs | 3 +- 2 files changed, 97 insertions(+), 41 deletions(-) diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs index 2bb4506..cb0c97c 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs @@ -272,7 +272,7 @@ async Task WithBlobPrefix() this.StorageConnectionString, leaseContainerName, "firsthost"); - var setOfMessages1 = await RunGenericScenario(eventProcessorHostFirst); + var runResult1 = await RunGenericScenario(eventProcessorHostFirst); // Consume all messages with second host. // Create host with 'secondhost' prefix. @@ -286,12 +286,12 @@ async Task WithBlobPrefix() this.StorageConnectionString, leaseContainerName, "secondhost"); - var setOfMessages2 = await RunGenericScenario(eventProcessorHostSecond, totalNumberOfEventsToSend: 0); + var runResult2 = await RunGenericScenario(eventProcessorHostSecond, totalNumberOfEventsToSend: 0); // Confirm that we are looking at 2 identical sets of messages in the end. - foreach (var kvp in setOfMessages1) + foreach (var kvp in runResult1.ReceivedEvents) { - Assert.True(kvp.Value.Count() == setOfMessages2[kvp.Key].Count, + Assert.True(kvp.Value.Count() == runResult2.ReceivedEvents[kvp.Key].Count, $"The sets of messages returned from first host and the second host are different for partition {kvp.Key}."); } } @@ -548,10 +548,10 @@ async Task InitialOffsetProviderWithDateTime() MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions); + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -580,10 +580,10 @@ async Task InitialOffsetProviderWithOffset() MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions); + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -604,10 +604,10 @@ async Task InitialOffsetProviderWithEndOfStream() MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions); + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -641,10 +641,11 @@ async Task InitialOffsetProviderOverrideBehavior() InitialOffsetProvider = partitionId => PartitionReceiver.StartOfStream, MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkPointLastEvent: false); + + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkpointLastEvent: false); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -670,10 +671,10 @@ async Task CheckpointEventDataShouldHold() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents = await RunGenericScenario(eventProcessorHostSecond); + var runResult = await RunGenericScenario(eventProcessorHostSecond); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -689,7 +690,7 @@ async Task CheckpointBatchShouldHold() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - await RunGenericScenario(eventProcessorHostFirst, checkPointLastEvent: false, checkPointBatch: true); + await RunGenericScenario(eventProcessorHostFirst, checkpointLastEvent: false, checkpointBatch: true); // For the second time we initiate a host and this time it should pick from where the previous host left. // In other words, it shouldn't start receiving from start of the stream. @@ -699,10 +700,10 @@ async Task CheckpointBatchShouldHold() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents = await RunGenericScenario(eventProcessorHostSecond); + var runResult = await RunGenericScenario(eventProcessorHostSecond); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -812,8 +813,8 @@ async Task NoCheckpointThenNewHostReadsFromStart() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents1 = await RunGenericScenario(eventProcessorHostFirst, checkPointLastEvent: false); - var totalEventsFromFirstHost = receivedEvents1.Sum(part => part.Value.Count); + var runResult1 = await RunGenericScenario(eventProcessorHostFirst, checkpointLastEvent: false); + var totalEventsFromFirstHost = runResult1.ReceivedEvents.Sum(part => part.Value.Count); // Second time we initiate a host, it should pick from where previous host left. // In other words, it shouldn't start receiving from start of the stream. @@ -823,12 +824,33 @@ async Task NoCheckpointThenNewHostReadsFromStart() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents2 = await RunGenericScenario(eventProcessorHostSecond); - var totalEventsFromSecondHost = receivedEvents2.Sum(part => part.Value.Count); + var runResult2 = await RunGenericScenario(eventProcessorHostSecond); + var totalEventsFromSecondHost = runResult2.ReceivedEvents.Sum(part => part.Value.Count); // Second host should have received +partition-count messages. Assert.True(totalEventsFromFirstHost + PartitionIds.Count() == totalEventsFromSecondHost, - $"Second host received {receivedEvents2} events where as first host receive {receivedEvents1} events."); + $"Second host received {totalEventsFromSecondHost} events where as first host receive {totalEventsFromFirstHost} events."); + } + + /// + /// Checkpointing every message received should be Ok. No failures expected. + /// + /// + [Fact] + async Task CheckpointEveryMessageReceived() + { + var eventProcessorHost = new EventProcessorHost( + null, + PartitionReceiver.DefaultConsumerGroupName, + this.EventHubConnectionString, + this.StorageConnectionString, + this.LeaseContainerName); + + var runResult = await RunGenericScenario(eventProcessorHost, totalNumberOfEventsToSend: 10, + checkpointLastEvent: false, checkpoingEveryEvent: true); + + // Validate there were not failures. + Assert.True(runResult.NumberOfFailures == 0, $"RunResult returned with {runResult.NumberOfFailures} failures!"); } async Task>> DiscoverEndOfStream() @@ -845,11 +867,11 @@ async Task>> DiscoverEndOfStream() return partitions.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); } - async Task>> RunGenericScenario(EventProcessorHost eventProcessorHost, - EventProcessorOptions epo = null, int totalNumberOfEventsToSend = 1, bool checkPointLastEvent = true, - bool checkPointBatch = false) + async Task RunGenericScenario(EventProcessorHost eventProcessorHost, + EventProcessorOptions epo = null, int totalNumberOfEventsToSend = 1, bool checkpointLastEvent = true, + bool checkpointBatch = false, bool checkpoingEveryEvent = false) { - var receivedEvents = new ConcurrentDictionary>(); + var runResult = new GenericScenarioResult(); var lastReceivedAt = DateTime.Now; if (epo == null) @@ -875,27 +897,32 @@ async Task>> RunGenericScenario(EventProcesso string hostName = createArgs.Item1.Owner; processor.OnOpen += (_, partitionContext) => Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); processor.OnClose += (_, closeArgs) => Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); - processor.OnProcessError += (_, errorArgs) => Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + processor.OnProcessError += (_, errorArgs) => + { + Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + Interlocked.Increment(ref runResult.NumberOfFailures); + }; processor.OnProcessEvents += (_, eventsArgs) => { int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); if (eventCount > 0) { - List events; - receivedEvents.TryGetValue(partitionId, out events); - if (events == null) + lastReceivedAt = DateTime.Now; + runResult.AddEvents(partitionId, eventsArgs.Item2.events); + + foreach (var e in eventsArgs.Item2.events) { - events = new List(); + // Checkpoint every event received? + if (checkpoingEveryEvent) + { + eventsArgs.Item1.CheckpointAsync(e).Wait(); + } } - - events.AddRange(eventsArgs.Item2.events); - receivedEvents[partitionId] = events; - lastReceivedAt = DateTime.Now; } - eventsArgs.Item2.checkPointLastEvent = checkPointLastEvent; - eventsArgs.Item2.checkPointBatch = checkPointBatch; + eventsArgs.Item2.checkPointLastEvent = checkpointLastEvent; + eventsArgs.Item2.checkPointBatch = checkpointBatch; }; }; @@ -922,10 +949,12 @@ async Task>> RunGenericScenario(EventProcesso await Task.Delay(1000); } - Log("Verifying at least an event was received by each partition"); + Log($"Verifying at least {totalNumberOfEventsToSend} event(s) was received by each partition"); foreach (var partitionId in PartitionIds) { - Assert.True(receivedEvents.ContainsKey(partitionId), $"Partition {partitionId} didn't receive any message!"); + Assert.True(runResult.ReceivedEvents.ContainsKey(partitionId) + && runResult.ReceivedEvents[partitionId].Count >= totalNumberOfEventsToSend, + $"Partition {partitionId} didn't receive expected number of messages. Expected {totalNumberOfEventsToSend}, received {runResult.ReceivedEvents[partitionId].Count}."); } Log("Success"); @@ -936,7 +965,7 @@ async Task>> RunGenericScenario(EventProcesso await eventProcessorHost.UnregisterEventProcessorAsync(); } - return receivedEvents.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + return runResult; } async Task SendToPartitionAsync(string partitionId, string messageBody, string connectionString) @@ -961,5 +990,31 @@ protected void Log(string message) Console.WriteLine(log); } } + + class GenericScenarioResult + { + public ConcurrentDictionary> ReceivedEvents = new ConcurrentDictionary>(); + public int NumberOfFailures = 0; + + object listLock = new object(); + + public void AddEvents(string partitionId, IEnumerable addEvents) + { + List events; + this.ReceivedEvents.TryGetValue(partitionId, out events); + if (events == null) + { + events = new List(); + } + + // Account the case where 2 hosts racing by working on the same partition. + lock (listLock) + { + events.AddRange(addEvents); + } + + this.ReceivedEvents[partitionId] = events; + } + } } diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs index 72ea6df..d019951 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs @@ -48,11 +48,12 @@ Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable Date: Thu, 9 Feb 2017 12:56:29 -0800 Subject: [PATCH 06/12] Marking ICheckpointManager.UpdateCheckpointAsync(Checkpoint checkpoint) as obsolute --- .../AzureStorageCheckpointLeaseManager.cs | 6 ++++++ .../ICheckpointManager.cs | 3 +++ 2 files changed, 9 insertions(+) diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs index 2025a74..2127148 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs @@ -112,6 +112,12 @@ public async Task CreateCheckpointIfNotExistsAsync(string partitionI return checkpoint; } + [Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] + public Task UpdateCheckpointAsync(Checkpoint checkpoint) + { + throw new NotImplementedException(); + } + public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) { AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease)lease); diff --git a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs index d2372f4..334c6b6 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs @@ -46,6 +46,9 @@ public interface ICheckpointManager /// The checkpoint for the given partition, whether newly created or already existing. Task CreateCheckpointIfNotExistsAsync(string partitionId); + [System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] + Task UpdateCheckpointAsync(Checkpoint checkpoint); + /// /// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint. /// From 767af996fead23a0dba31d70e5d5dc1d8940dff8 Mon Sep 17 00:00:00 2001 From: serkar Date: Mon, 13 Feb 2017 13:30:01 -0800 Subject: [PATCH 07/12] Fix: ReceiveHandler and PartitionPump should gracefully handle exceptions thrown at ProcessError callbacks. Adding a CIT to cover error cases in EPH. --- .../EventHubPartitionPump.cs | 13 +- .../Amqp/AmqpPartitionReceiver.cs | 13 +- .../EventHubsEventSource.cs | 11 +- .../EventProcessorHostTests.cs | 2 +- .../NegativeCases.cs | 143 ++++++++++++++++++ 5 files changed, 174 insertions(+), 8 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs index 2ddc686..a1ebd20 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs @@ -161,10 +161,15 @@ public async Task ProcessErrorAsync(Exception error) this.eventHubPartitionPump.Host.Id, this.eventHubPartitionPump.PartitionContext.PartitionId, "EventHub client error:", error.ToString()); } - // We would like to deliver all errors in the pump to error handler. - await this.eventHubPartitionPump.ProcessErrorAsync(error).ConfigureAwait(false); - - this.eventHubPartitionPump.PumpStatus = PartitionPumpStatus.Errored; + try + { + // We would like to deliver all errors in the pump to error handler. + await this.eventHubPartitionPump.ProcessErrorAsync(error).ConfigureAwait(false); + } + finally + { + this.eventHubPartitionPump.PumpStatus = PartitionPumpStatus.Errored; + } } } } diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs index 90cd015..968a1c4 100644 --- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs +++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs @@ -331,7 +331,7 @@ void ReceiveHandlerClose() // Encapsulates taking the receivePumpLock, checking this.receiveHandler for null, // calls this.receiveHandler.ProcessErrorAsync (starting this operation inside the receivePumpLock). - Task ReceiveHandlerProcessErrorAsync(Exception error) + async Task ReceiveHandlerProcessErrorAsync(Exception error) { Task processErrorTask = null; lock (this.receivePumpLock) @@ -342,7 +342,16 @@ Task ReceiveHandlerProcessErrorAsync(Exception error) } } - return processErrorTask ?? Task.FromResult(0); + try + { + await (processErrorTask ?? Task.FromResult(0)); + } + catch (Exception ex) + { + // Trace and swallow any error at this point. + EventHubsEventSource.Log.ExceptionHandled(this.ClientId, this.PartitionId, + $"{ex.Message} while calling ReceiveHandler's ProcessErrorAsync"); + } } // Encapsulates taking the receivePumpLock, checking this.receiveHandler for null, diff --git a/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs b/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs index 0f351cd..65af743 100644 --- a/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs +++ b/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs @@ -199,7 +199,16 @@ public void GetEventHubPartitionRuntimeInformationException(string clientId, str WriteEvent(20, clientId, partitionId, error); } } - + + [Event(100, Level = EventLevel.Error, Message = "{0}: Exception handled: {1}")] + public void ExceptionHandled(string clientId, string partitionId, string error) + { + if (IsEnabled()) + { + WriteEvent(100, clientId, partitionId, error); + } + } + // TODO: Add Keywords if desired. //public class Keywords // This is a bitvector //{ diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs index cb0c97c..9cb254c 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs @@ -968,7 +968,7 @@ async Task RunGenericScenario(EventProcessorHost eventPro return runResult; } - async Task SendToPartitionAsync(string partitionId, string messageBody, string connectionString) + protected async Task SendToPartitionAsync(string partitionId, string messageBody, string connectionString) { var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString); try diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs index 54deaa7..ed70f46 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs @@ -4,6 +4,10 @@ namespace Microsoft.Azure.EventHubs.Processor.UnitTests { using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using System.Text; using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; @@ -76,5 +80,144 @@ async Task NonexsistentEntity() Assert.NotNull(ex.InnerException); Assert.IsType(ex.InnerException); } + + /// + /// While processing events one event causes a failure. Host should be able to recover any error. + /// + /// + [Fact] + async Task HostShouldRecoverWhenProcessProcessEventsAsyncThrows() + { + var lastReceivedAt = DateTime.Now; + var lastReceivedAtLock = new object(); + var poisonMessageReceived = false; + var poisonMessageProperty = "poison"; + var processorFactory = new TestEventProcessorFactory(); + var receivedEventCounts = new ConcurrentDictionary(); + + var eventProcessorHost = new EventProcessorHost( + null, + PartitionReceiver.DefaultConsumerGroupName, + this.EventHubConnectionString, + this.StorageConnectionString, + this.LeaseContainerName); + + processorFactory.OnCreateProcessor += (f, createArgs) => + { + var processor = createArgs.Item2; + string partitionId = createArgs.Item1.PartitionId; + string hostName = createArgs.Item1.Owner; + string consumerGroupName = createArgs.Item1.ConsumerGroupName; + processor.OnOpen += (_, partitionContext) => Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor opened"); + processor.OnClose += (_, closeArgs) => Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); + processor.OnProcessError += (_, errorArgs) => + { + Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + + // Throw once more here depending on where we are at exception sequence. + if (errorArgs.Item2.Message.Contains("ExceptionSequence1")) + { + throw new Exception("ExceptionSequence2"); + } + if (errorArgs.Item2.Message.Contains("ExceptionSequence2")) + { + throw new Exception("ExceptionSequence3"); + } + }; + processor.OnProcessEvents += (_, eventsArgs) => + { + int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; + Log($"{hostName} > {consumerGroupName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + if (eventCount > 0) + { + lock (lastReceivedAtLock) + { + lastReceivedAt = DateTime.Now; + } + + foreach (var e in eventsArgs.Item2.events) + { + // If this is poisoned event then throw. + if (!poisonMessageReceived && e.Properties.ContainsKey(poisonMessageProperty)) + { + poisonMessageReceived = true; + Log($"Received poisoned message from partition {partitionId}"); + throw new Exception("ExceptionSequence1"); + } + + // Track received events so we can validate at the end. + if (!receivedEventCounts.ContainsKey(partitionId)) + { + receivedEventCounts[partitionId] = 0; + } + + receivedEventCounts[partitionId]++; + } + } + }; + }; + + try + { + Log("Registering processorFactory..."); + var epo = new EventProcessorOptions() + { + MaxBatchSize = 100 + }; + await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, epo); + + Log("Waiting for partition ownership to settle..."); + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Send first set of messages. + Log("Sending an event to each partition as the first set of messages."); + var sendTasks = new List(); + foreach (var partitionId in PartitionIds) + { + sendTasks.Add(this.SendToPartitionAsync(partitionId, $"{partitionId} event.", this.ConnectionStringBuilder.ToString())); + } + await Task.WhenAll(sendTasks); + + // Now send 1 poisoned message. This will fail one of the partition pumps. + Log($"Sending a poison event to parttition {PartitionIds.First()}"); + var client = EventHubClient.CreateFromConnectionString(this.EventHubConnectionString); + var pSender = client.CreatePartitionSender(PartitionIds.First()); + var ed = new EventData(Encoding.UTF8.GetBytes("This is poison message")); + ed.Properties[poisonMessageProperty] = true; + await pSender.SendAsync(ed); + + // Wait sometime. The host should fail and then recever during this time. + await Task.Delay(30000); + + // Send second set of messages. + Log("Sending an event to each partition as the second set of messages."); + sendTasks.Clear(); + foreach (var partitionId in PartitionIds) + { + sendTasks.Add(this.SendToPartitionAsync(partitionId, $"{partitionId} event.", this.ConnectionStringBuilder.ToString())); + } + await Task.WhenAll(sendTasks); + + Log("Waiting until hosts are idle, i.e. no more messages to receive."); + while (lastReceivedAt > DateTime.Now.AddSeconds(-60)) + { + await Task.Delay(1000); + } + + Log("Verifying poison message was received"); + Assert.True(poisonMessageReceived, "Didn't receive poison message!"); + + Log("Verifying received events by each partition"); + foreach (var partitionId in PartitionIds) + { + var receivedEventCount = receivedEventCounts[partitionId]; + Assert.True(receivedEventCount >= 2, $"Partition {partitionId} received {receivedEventCount} where as at least 2 expected!"); + } + } + finally + { + Log("Calling UnregisterEventProcessorAsync."); + } + } } } From b97e940271b0e31c054ea0970270e00561e9d936 Mon Sep 17 00:00:00 2001 From: serkar Date: Tue, 14 Feb 2017 09:21:25 -0800 Subject: [PATCH 08/12] HostShouldRecoverWhenProcessProcessEventsAsyncThrows unit test to unregister host at the end. --- .../NegativeCases.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs index ed70f46..3b47752 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs @@ -210,6 +210,11 @@ async Task HostShouldRecoverWhenProcessProcessEventsAsyncThrows() Log("Verifying received events by each partition"); foreach (var partitionId in PartitionIds) { + if (!receivedEventCounts.ContainsKey(partitionId)) + { + throw new Exception($"Partition {partitionId} didn't receive any messages!"); + } + var receivedEventCount = receivedEventCounts[partitionId]; Assert.True(receivedEventCount >= 2, $"Partition {partitionId} received {receivedEventCount} where as at least 2 expected!"); } @@ -217,6 +222,7 @@ async Task HostShouldRecoverWhenProcessProcessEventsAsyncThrows() finally { Log("Calling UnregisterEventProcessorAsync."); + await eventProcessorHost.UnregisterEventProcessorAsync(); } } } From 9ae7420bd23d9059b135eeb0c508f6e27dabe62b Mon Sep 17 00:00:00 2001 From: serkar Date: Fri, 3 Mar 2017 17:48:54 -0800 Subject: [PATCH 09/12] Cancelling swallow and trace change in ReceiveHandlerProcessErrorAsync(Exception error) --- .../Amqp/AmqpPartitionReceiver.cs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs index 6453951..1b21053 100644 --- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs +++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs @@ -348,7 +348,7 @@ void ReceiveHandlerClose() // Encapsulates taking the receivePumpLock, checking this.receiveHandler for null, // calls this.receiveHandler.ProcessErrorAsync (starting this operation inside the receivePumpLock). - async Task ReceiveHandlerProcessErrorAsync(Exception error) + Task ReceiveHandlerProcessErrorAsync(Exception error) { Task processErrorTask = null; lock (this.receivePumpLock) @@ -359,16 +359,7 @@ async Task ReceiveHandlerProcessErrorAsync(Exception error) } } - try - { - await (processErrorTask ?? Task.FromResult(0)); - } - catch (Exception ex) - { - // Trace and swallow any error at this point. - EventHubsEventSource.Log.ExceptionHandled(this.ClientId, this.PartitionId, - $"{ex.Message} while calling ReceiveHandler's ProcessErrorAsync"); - } + return processErrorTask ?? Task.FromResult(0); } // Encapsulates taking the receivePumpLock, checking this.receiveHandler for null, From 50c50b1675177a3675616446d38e8fc36ac54e2d Mon Sep 17 00:00:00 2001 From: serkar Date: Fri, 3 Mar 2017 17:54:06 -0800 Subject: [PATCH 10/12] Remove unnecessary ExceptionHandled trace from EventSource. --- src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs b/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs index 6675096..e2e17c4 100644 --- a/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs +++ b/src/Microsoft.Azure.EventHubs/EventHubsEventSource.cs @@ -209,15 +209,6 @@ public void ReceiveHandlerExitingWithError(string clientId, string partitionId, } } - [Event(100, Level = EventLevel.Error, Message = "{0}: Exception handled: {1}")] - public void ExceptionHandled(string clientId, string partitionId, string error) - { - if (IsEnabled()) - { - WriteEvent(100, clientId, partitionId, error); - } - } - // TODO: Add Keywords if desired. //public class Keywords // This is a bitvector //{ From 2030a1a823a699ddcb917bfe86aa7ee3c847b58f Mon Sep 17 00:00:00 2001 From: serkar Date: Thu, 9 Mar 2017 09:16:24 -0800 Subject: [PATCH 11/12] + Removing Obsolete signature from ICheckpointManager. + TaskContinuationOptions.OnlyOnRanToCompletion while persisting lease. --- .../AzureStorageCheckpointLeaseManager.cs | 12 +++--------- .../ICheckpointManager.cs | 3 --- .../PartitionContext.cs | 4 ++-- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs index 267f0cd..865b1fa 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs @@ -112,18 +112,12 @@ public async Task CreateCheckpointIfNotExistsAsync(string partitionI return checkpoint; } - [Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] - public Task UpdateCheckpointAsync(Checkpoint checkpoint) - { - throw new NotImplementedException(); - } - public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) { AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease)lease); newLease.Offset = checkpoint.Offset; newLease.SequenceNumber = checkpoint.SequenceNumber; - await UpdateLeaseAsync(newLease).ConfigureAwait(false); + await this.UpdateLeaseAsync(newLease).ConfigureAwait(false); } public Task DeleteCheckpointAsync(string partitionId) @@ -364,7 +358,7 @@ async Task RenewLeaseCoreAsync(AzureBlobLease lease) { if (WasLeaseLost(partitionId, se)) { - throw new LeaseLostException(lease, se); + throw new LeaseLostException(partitionId, se); } throw; @@ -400,7 +394,7 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) { if (WasLeaseLost(partitionId, se)) { - throw new LeaseLostException(lease, se); + throw new LeaseLostException(partitionId, se); } throw; diff --git a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs index 334c6b6..d2372f4 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs @@ -46,9 +46,6 @@ public interface ICheckpointManager /// The checkpoint for the given partition, whether newly created or already existing. Task CreateCheckpointIfNotExistsAsync(string partitionId); - [System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] - Task UpdateCheckpointAsync(Checkpoint checkpoint); - /// /// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint. /// diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs index 910ff90..9a6f368 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs @@ -143,7 +143,7 @@ public override string ToString() return $"PartitionContext({this.EventHubPath}/{this.ConsumerGroupName}/{this.PartitionId}/{this.SequenceNumber})"; } - async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOfRangeException, InterruptedException, ExecutionException + async Task PersistCheckpointAsync(Checkpoint checkpoint) { ProcessorEventSource.Log.PartitionPumpCheckpointStart(this.host.Id, checkpoint.PartitionId, checkpoint.Offset, checkpoint.SequenceNumber); try @@ -160,7 +160,7 @@ await this.host.CheckpointManager.UpdateCheckpointAsync(this.Lease, checkpoint). { this.Lease.Offset = checkpoint.Offset; this.Lease.SequenceNumber = checkpoint.SequenceNumber; - }).ConfigureAwait(false); + }, TaskContinuationOptions.OnlyOnRanToCompletion).ConfigureAwait(false); } else { From 661c3060d8ee28718575c37e6b1a30df8e7e15c1 Mon Sep 17 00:00:00 2001 From: serkar Date: Mon, 13 Mar 2017 09:08:09 -0700 Subject: [PATCH 12/12] Adding Obsolete ICheckpointManager.UpdateCheckpointAsync(Checkpoint checkpoint) back --- .../AzureStorageCheckpointLeaseManager.cs | 6 ++++++ .../ICheckpointManager.cs | 3 +++ 2 files changed, 9 insertions(+) diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs index 865b1fa..243cf39 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs @@ -103,6 +103,12 @@ public async Task GetCheckpointAsync(string partitionId) return checkpoint; } + [Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] + public Task UpdateCheckpointAsync(Checkpoint checkpoint) + { + throw new NotImplementedException(); + } + public async Task CreateCheckpointIfNotExistsAsync(string partitionId) { // Normally the lease will already be created, checkpoint store is initialized after lease store. diff --git a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs index d2372f4..e48e209 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs @@ -38,6 +38,9 @@ public interface ICheckpointManager /// Checkpoint info for the given partition, or null if none has been previously stored. Task GetCheckpointAsync(string partitionId); + [System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] + Task UpdateCheckpointAsync(Checkpoint checkpoint); + /// /// Create the checkpoint for the given partition if it doesn't exist. Do nothing if it does exist. /// The offset/sequenceNumber for a freshly-created checkpoint should be set to StartOfStream/0.