diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs index 769d04f..f1c54cc 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs @@ -10,7 +10,6 @@ namespace Microsoft.Azure.EventHubs.Processor class AzureBlobLease : Lease { // ctor needed for deserialization - internal AzureBlobLease() { } @@ -36,14 +35,9 @@ internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(sourc } // do not serialize - [JsonIgnore] public CloudBlockBlob Blob { get; } - public string Offset { get; set; } - - public long SequenceNumber { get; set; } - public override async Task IsExpired() { await this.Blob.FetchAttributesAsync().ConfigureAwait(false); // Get the latest metadata diff --git a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs index 7b56aa7..243cf39 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/AzureStorageCheckpointLeaseManager.cs @@ -28,7 +28,13 @@ sealed class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseMana static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2); static readonly TimeSpan leaseDuration = TimeSpan.FromSeconds(30); static readonly TimeSpan leaseRenewInterval = TimeSpan.FromSeconds(10); - readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions(); + + // Lease renew calls shouldn't wait more than leaseRenewInterval + readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions() + { + ServerTimeout = leaseRenewInterval, + MaximumExecutionTime = TimeSpan.FromMinutes(1) + }; internal AzureStorageCheckpointLeaseManager(string storageConnectionString, string leaseContainerName, string storageBlobPrefix) { @@ -97,6 +103,12 @@ public async Task GetCheckpointAsync(string partitionId) return checkpoint; } + [Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] + public Task UpdateCheckpointAsync(Checkpoint checkpoint) + { + throw new NotImplementedException(); + } + public async Task CreateCheckpointIfNotExistsAsync(string partitionId) { // Normally the lease will already be created, checkpoint store is initialized after lease store. @@ -106,13 +118,12 @@ public async Task CreateCheckpointIfNotExistsAsync(string partitionI return checkpoint; } - public async Task UpdateCheckpointAsync(Checkpoint checkpoint) + public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) { - // Need to fetch the most current lease data so that we can update it correctly. - AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(checkpoint.PartitionId).ConfigureAwait(false); - lease.Offset = checkpoint.Offset; - lease.SequenceNumber = checkpoint.SequenceNumber; - await UpdateLeaseAsync(lease).ConfigureAwait(false); + AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease)lease); + newLease.Offset = checkpoint.Offset; + newLease.SequenceNumber = checkpoint.SequenceNumber; + await this.UpdateLeaseAsync(newLease).ConfigureAwait(false); } public Task DeleteCheckpointAsync(string partitionId) @@ -343,8 +354,8 @@ public Task RenewLeaseAsync(Lease lease) async Task RenewLeaseCoreAsync(AzureBlobLease lease) { CloudBlockBlob leaseBlob = lease.Blob; - bool retval = true; string partitionId = lease.PartitionId; + try { await leaseBlob.RenewLeaseAsync(AccessCondition.GenerateLeaseCondition(lease.Token), this.renewRequestOptions, null).ConfigureAwait(false); @@ -353,15 +364,13 @@ async Task RenewLeaseCoreAsync(AzureBlobLease lease) { if (WasLeaseLost(partitionId, se)) { - retval = false; - } - else - { - throw; + throw new LeaseLostException(partitionId, se); } + + throw; } - return retval; + return true; } public Task ReleaseLeaseAsync(Lease lease) @@ -374,8 +383,8 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, lease.PartitionId, "Releasing lease"); CloudBlockBlob leaseBlob = lease.Blob; - bool retval = true; string partitionId = lease.PartitionId; + try { string leaseId = lease.Token; @@ -391,15 +400,13 @@ async Task ReleaseLeaseCoreAsync(AzureBlobLease lease) { if (WasLeaseLost(partitionId, se)) { - retval = false; - } - else - { - throw; + throw new LeaseLostException(partitionId, se); } + + throw; } - return retval; + return true; } public Task UpdateLeaseAsync(Lease lease) @@ -419,16 +426,13 @@ async Task UpdateLeaseCoreAsync(AzureBlobLease lease) string token = lease.Token; if (string.IsNullOrEmpty(token)) - { - return false; - } - - // First, renew the lease to make sure the update will go through. - if (!await this.RenewLeaseAsync(lease).ConfigureAwait(false)) { return false; } + // First, renew the lease to make sure the update will go through. + await this.RenewLeaseAsync(lease).ConfigureAwait(false); + CloudBlockBlob leaseBlob = lease.Blob; try { @@ -481,6 +485,7 @@ bool WasLeaseLost(string partitionId, StorageException se) } } } + return retval; } diff --git a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs index 6ec6da2..e48e209 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs @@ -38,6 +38,9 @@ public interface ICheckpointManager /// Checkpoint info for the given partition, or null if none has been previously stored. Task GetCheckpointAsync(string partitionId); + [System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)] + Task UpdateCheckpointAsync(Checkpoint checkpoint); + /// /// Create the checkpoint for the given partition if it doesn't exist. Do nothing if it does exist. /// The offset/sequenceNumber for a freshly-created checkpoint should be set to StartOfStream/0. @@ -49,8 +52,9 @@ public interface ICheckpointManager /// /// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint. /// + /// Partition information against which to perform a checkpoint. /// offset/sequeceNumber to update the store with. - Task UpdateCheckpointAsync(Checkpoint checkpoint); + Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint); /// /// Delete the stored checkpoint for the given partition. If there is no stored checkpoint for the diff --git a/src/Microsoft.Azure.EventHubs.Processor/Lease.cs b/src/Microsoft.Azure.EventHubs.Processor/Lease.cs index 23905f4..877dd63 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/Lease.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/Lease.cs @@ -26,6 +26,10 @@ protected Lease(Lease source) this.Token = source.Token; } + public string Offset { get; set; } + + public long SequenceNumber { get; set; } + public string PartitionId { get; set; } public string Owner { get; set; } @@ -36,7 +40,8 @@ protected Lease(Lease source) public virtual Task IsExpired() { - // this function is meaningless in the base class + // By default lease never expires. + // Deriving class will implement the lease expiry logic. return Task.FromResult(false); } diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs index 27222d2..9a6f368 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs @@ -44,15 +44,6 @@ public string Owner object ThisLock { get; } - /// - /// Updates the offset/sequenceNumber in the PartitionContext with the values in the received EventData object. - /// - /// Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be - /// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber - /// is equal or greater, the new offset will be as well. - /// - /// A received EventData with valid offset and sequenceNumber - /// If the sequenceNumber in the provided event is less than the current value internal void SetOffsetAndSequenceNumber(EventData eventData) { if (eventData == null) @@ -60,35 +51,12 @@ internal void SetOffsetAndSequenceNumber(EventData eventData) throw new ArgumentNullException(nameof(eventData)); } - this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber); - } - - /// - /// Updates the offset/sequenceNumber in the PartitionContext. - /// - /// These two values are closely tied and must be updated in an atomic fashion, hence the combined setter. - /// Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be - /// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber - /// is equal or greater, the new offset will be as well. - /// - /// New offset value - /// New sequenceNumber value - /// If the sequenceNumber in the provided event is less than the current value - void SetOffsetAndSequenceNumber(string offset, long sequenceNumber) - { - lock(this.ThisLock) + lock (this.ThisLock) { - if (sequenceNumber >= this.SequenceNumber) - { - this.Offset = offset; - this.SequenceNumber = sequenceNumber; - } - else - { - throw new ArgumentOutOfRangeException("offset/sequenceNumber", $"New offset {offset}/{sequenceNumber} is less than previous {this.Offset}/{this.SequenceNumber}"); - } + this.Offset = eventData.SystemProperties.Offset; + this.SequenceNumber = eventData.SystemProperties.SequenceNumber; } - } + } internal async Task GetInitialOffsetAsync() // throws InterruptedException, ExecutionException { @@ -132,7 +100,6 @@ internal async Task GetInitialOffsetAsync() // throws InterruptedExcepti /// /// Writes the current offset and sequenceNumber to the checkpoint store via the checkpoint manager. /// - /// If this.sequenceNumber is less than the last checkpointed value public Task CheckpointAsync() { // Capture the current offset and sequenceNumber. Synchronize to be sure we get a matched pair @@ -153,10 +120,21 @@ public Task CheckpointAsync() /// values to the checkpoint store via the checkpoint manager. /// /// A received EventData with valid offset and sequenceNumber + /// If suplied eventData is null /// If the sequenceNumber is less than the last checkpointed value public Task CheckpointAsync(EventData eventData) { - this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber); + if (eventData == null) + { + throw new ArgumentNullException("eventData"); + } + + // We have never seen this sequence number yet + if (eventData.SystemProperties.SequenceNumber > this.SequenceNumber) + { + throw new ArgumentOutOfRangeException("eventData.SystemProperties.SequenceNumber"); + } + return this.PersistCheckpointAsync(new Checkpoint(this.PartitionId, eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber)); } @@ -165,7 +143,7 @@ public override string ToString() return $"PartitionContext({this.EventHubPath}/{this.ConsumerGroupName}/{this.PartitionId}/{this.SequenceNumber})"; } - async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOfRangeException, InterruptedException, ExecutionException + async Task PersistCheckpointAsync(Checkpoint checkpoint) { ProcessorEventSource.Log.PartitionPumpCheckpointStart(this.host.Id, checkpoint.PartitionId, checkpoint.Offset, checkpoint.SequenceNumber); try @@ -175,12 +153,14 @@ async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOf { if (inStoreCheckpoint == null) { - inStoreCheckpoint = await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false); + await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false); } - inStoreCheckpoint.Offset = checkpoint.Offset; - inStoreCheckpoint.SequenceNumber = checkpoint.SequenceNumber; - await this.host.CheckpointManager.UpdateCheckpointAsync(inStoreCheckpoint).ConfigureAwait(false); + await this.host.CheckpointManager.UpdateCheckpointAsync(this.Lease, checkpoint).ContinueWith((obj) => + { + this.Lease.Offset = checkpoint.Offset; + this.Lease.SequenceNumber = checkpoint.SequenceNumber; + }, TaskContinuationOptions.OnlyOnRanToCompletion).ConfigureAwait(false); } else { diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs index 8edada2..9bb3e17 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionManager.cs @@ -192,19 +192,19 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception // Renew any leases that currently belong to us. IEnumerable> gettingAllLeases = leaseManager.GetAllLeases(); List leasesOwnedByOthers = new List(); - int ourLeasesCount = 0; - foreach (Task getLeastTask in gettingAllLeases) + int ourLeaseCount = 0; + foreach (Task getLeaseTask in gettingAllLeases) { try { - Lease possibleLease = await getLeastTask.ConfigureAwait(false); + Lease possibleLease = await getLeaseTask.ConfigureAwait(false); allLeases[possibleLease.PartitionId] = possibleLease; if (await possibleLease.IsExpired().ConfigureAwait(false)) { ProcessorEventSource.Log.PartitionPumpInfo(this.host.Id, possibleLease.PartitionId, "Trying to acquire lease."); if (await leaseManager.AcquireLeaseAsync(possibleLease).ConfigureAwait(false)) { - ourLeasesCount++; + ourLeaseCount++; } else { @@ -215,11 +215,15 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception else if (possibleLease.Owner == this.host.HostName) { ProcessorEventSource.Log.PartitionPumpInfo(this.host.Id, possibleLease.PartitionId, "Trying to renew lease."); - if (await leaseManager.RenewLeaseAsync(possibleLease).ConfigureAwait(false)) + + // Try to renew the lease. If successful then this lease belongs to us, + // if throws LeaseLostException then we don't own it anymore. + try { - ourLeasesCount++; + await leaseManager.RenewLeaseAsync(possibleLease).ConfigureAwait(false); + ourLeaseCount++; } - else + catch (LeaseLostException) { // Probably failed because another host stole it between get and renew leasesOwnedByOthers.Add(possibleLease); @@ -240,31 +244,30 @@ async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception // Grab more leases if available and needed for load balancing if (leasesOwnedByOthers.Count > 0) { - IEnumerable stealTheseLeases = WhichLeasesToSteal(leasesOwnedByOthers, ourLeasesCount); - if (stealTheseLeases != null) + Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers, ourLeaseCount); + if (stealThisLease != null) { - foreach (Lease stealee in stealTheseLeases) + try { - try + ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.Id, stealThisLease.PartitionId); + if (await leaseManager.AcquireLeaseAsync(stealThisLease).ConfigureAwait(false)) { - ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.Id, stealee.PartitionId); - if (await leaseManager.AcquireLeaseAsync(stealee).ConfigureAwait(false)) - { - // Succeeded in stealing lease - ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.Id, stealee.PartitionId); - ourLeasesCount++; - } - else - { - ProcessorEventSource.Log.EventProcessorHostWarning(this.host.Id, "Failed to steal lease for partition " + stealee.PartitionId, null); - } + // Succeeded in stealing lease + ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.Id, stealThisLease.PartitionId); } - catch (Exception e) + else { - ProcessorEventSource.Log.EventProcessorHostError(this.host.Id, "Exception during stealing lease for partition " + stealee.PartitionId, e.ToString()); - this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, stealee.PartitionId, e, EventProcessorHostActionStrings.StealingLease); + ProcessorEventSource.Log.EventProcessorHostWarning(this.host.Id, + "Failed to steal lease for partition " + stealThisLease.PartitionId, null); } } + catch (Exception e) + { + ProcessorEventSource.Log.EventProcessorHostError(this.host.Id, + "Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString()); + this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, + stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease); + } } } @@ -366,12 +369,11 @@ Task RemoveAllPumpsAsync(CloseReason reason) return Task.WhenAll(tasks); } - IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLeaseCount) + Lease WhichLeaseToSteal(List stealableLeases, int haveLeaseCount) { IDictionary countsByOwner = CountLeasesByOwner(stealableLeases); - string biggestOwner = FindBiggestOwner(countsByOwner); - int biggestCount = countsByOwner[biggestOwner]; - List stealTheseLeases = null; + var biggestOwner = countsByOwner.OrderByDescending(o => o.Value).First(); + Lease stealThisLease = null; // If the number of leases is a multiple of the number of hosts, then the desired configuration is // that all hosts own the name number of leases, and the difference between the "biggest" owner and @@ -384,7 +386,7 @@ IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLease // // In either case, if the difference between this host and the biggest owner is 2 or more, then the // system is not in the most evenly-distributed configuration, so steal one lease from the biggest. - // If there is a tie for biggest, findBiggestOwner() picks whichever appears first in the list because + // If there is a tie for biggest, we pick whichever appears first in the list because // it doesn't really matter which "biggest" is trimmed down. // // Stealing one at a time prevents flapping because it reduces the difference between the biggest and @@ -392,61 +394,31 @@ IEnumerable WhichLeasesToSteal(List stealableLeases, int haveLease // end up below 0. This host may become tied for biggest, but it cannot become larger than the host that // it is stealing from. - if ((biggestCount - haveLeaseCount) >= 2) + if ((biggestOwner.Value - haveLeaseCount) >= 2) { - stealTheseLeases = new List(); - foreach (Lease l in stealableLeases) - { - if (l.Owner == biggestOwner) - { - stealTheseLeases.Add(l); - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Proposed to steal lease for partition {l.PartitionId} from {biggestOwner}"); - break; - } - } + stealThisLease = stealableLeases.Where(l => l.Owner == biggestOwner.Key).First(); + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Proposed to steal lease for partition {stealThisLease.PartitionId} from {biggestOwner.Key}"); } - return stealTheseLeases; + return stealThisLease; } - string FindBiggestOwner(IDictionary countsByOwner) + Dictionary CountLeasesByOwner(IEnumerable leases) { - int biggestCount = 0; - string biggestOwner = null; - foreach (string owner in countsByOwner.Keys) - { - if (countsByOwner[owner] > biggestCount) - { - biggestCount = countsByOwner[owner]; - biggestOwner = owner; - } - } - return biggestOwner; - } + var counts = leases.GroupBy(lease => lease.Owner).Select(group => new { + Owner = group.Key, + Count = group.Count() + }); - IDictionary CountLeasesByOwner(IEnumerable leases) - { - IDictionary counts = new Dictionary(); - foreach (Lease l in leases) + // Log ownership mapping. + foreach (var owner in counts) { - if (counts.ContainsKey(l.Owner)) - { - int oldCount = counts[l.Owner]; - counts[l.Owner] = oldCount + 1; - } - else - { - counts[l.Owner] = 1; - } + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Host {owner.Owner} owns {owner.Count} leases"); } - foreach (string owner in counts.Keys) - { - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Host {owner} owns {counts[owner]} leases"); - } + ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Total hosts in list: {counts.Count()}"); - ProcessorEventSource.Log.EventProcessorHostInfo(this.host.Id, $"Total hosts in list: {counts.Count}"); - return counts; + return counts.ToDictionary(e => e.Owner, e => e.Count); } } } \ No newline at end of file diff --git a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs index ce497b2..94b6daa 100644 --- a/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs +++ b/src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs @@ -112,7 +112,12 @@ public async Task CloseAsync(CloseReason reason) if (reason != CloseReason.LeaseLost) { // Since this pump is dead, release the lease. - await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); + // Ignore LeaseLostException + try + { + await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false); + } + catch (LeaseLostException) { } } this.PumpStatus = PartitionPumpStatus.Closed; @@ -142,15 +147,18 @@ protected async Task ProcessEventsAsync(IEnumerable events) // protected by synchronizing too. using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false)) { - int eventCount = events?.Count() ?? 0; - ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.Id, this.PartitionContext.PartitionId, eventCount); + ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.Id, + this.PartitionContext.PartitionId, events?.Count() ?? 0); try { - if (eventCount > 0) + EventData last = events?.LastOrDefault(); + if (last != null) { - var lastMessage = events.Last(); - this.PartitionContext.SequenceNumber = lastMessage.SystemProperties.SequenceNumber; - this.PartitionContext.Offset = lastMessage.SystemProperties.Offset; + ProcessorEventSource.Log.PartitionPumpInfo( + this.Host.Id, + this.PartitionContext.PartitionId, + "Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber); + this.PartitionContext.SetOffsetAndSequenceNumber(last); } await this.Processor.ProcessEventsAsync(this.PartitionContext, events).ConfigureAwait(false); @@ -164,16 +172,6 @@ protected async Task ProcessEventsAsync(IEnumerable events) { ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStop(this.Host.Id, this.PartitionContext.PartitionId); } - - EventData last = events?.LastOrDefault(); - if (last != null) - { - ProcessorEventSource.Log.PartitionPumpInfo( - this.Host.Id, - this.PartitionContext.PartitionId, - "Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber); - this.PartitionContext.SetOffsetAndSequenceNumber(last); - } } } diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs index 764f996..9cb254c 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/EventProcessorHostTests.cs @@ -153,22 +153,31 @@ Task SingleProcessorHost() [Fact] async Task MultipleProcessorHosts() { - Log("Testing with 2 EventProcessorHost instances"); + int hostCount = 3; + Log($"Testing with {hostCount} EventProcessorHost instances"); + + // Prepare partition trackers. var partitionReceiveEvents = new ConcurrentDictionary(); foreach (var partitionId in PartitionIds) { partitionReceiveEvents[partitionId] = new AsyncAutoResetEvent(false); } - int hostCount = 2; + // Prepare host trackers. + var hostReceiveEvents = new ConcurrentDictionary(); + var hosts = new List(); try { - for (int i = 0; i < hostCount; i++) + for (int hostId = 0; hostId < hostCount; hostId++) { + var thisHostName = $"host-{hostId}"; + hostReceiveEvents[thisHostName] = new AsyncAutoResetEvent(false); + Log("Creating EventProcessorHost"); var eventProcessorHost = new EventProcessorHost( + thisHostName, string.Empty, // Passing empty as entity path here rsince path is already in EH connection string. PartitionReceiver.DefaultConsumerGroupName, this.EventHubConnectionString, @@ -195,11 +204,11 @@ async Task MultipleProcessorHosts() processor.OnProcessEvents += (_, eventsArgs) => { int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; - Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); if (eventCount > 0) { - var receivedEvent = partitionReceiveEvents[partitionId]; - receivedEvent.Set(); + Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); + partitionReceiveEvents[partitionId].Set(); + hostReceiveEvents[hostName].Set(); } }; }; @@ -207,8 +216,10 @@ async Task MultipleProcessorHosts() await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions); } + // Allow some time for each host to own at least 1 partition. + // Partition stealing logic balances partition ownership one at a time. Log("Waiting for partition ownership to settle..."); - await Task.Delay(TimeSpan.FromSeconds(30)); + await Task.Delay(TimeSpan.FromSeconds(60)); Log("Sending an event to each partition"); var sendTasks = new List(); @@ -219,11 +230,17 @@ async Task MultipleProcessorHosts() await Task.WhenAll(sendTasks); Log("Verifying an event was received by each partition"); - foreach (var partitionId in PartitionIds) + foreach (var e in partitionReceiveEvents) { - var receivedEvent = partitionReceiveEvents[partitionId]; - bool partitionReceivedMessage = await receivedEvent.WaitAsync(TimeSpan.FromSeconds(30)); - Assert.True(partitionReceivedMessage, $"Partition {partitionId} didn't receive any message!"); + bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.True(ret, $"Partition {e.Key} didn't receive any message!"); + } + + Log("Verifying at least an event was received by each host"); + foreach (var e in hostReceiveEvents) + { + bool ret = await e.Value.WaitAsync(TimeSpan.FromSeconds(30)); + Assert.True(ret, $"Host {e.Key} didn't receive any message!"); } } finally @@ -255,7 +272,7 @@ async Task WithBlobPrefix() this.StorageConnectionString, leaseContainerName, "firsthost"); - var setOfMessages1 = await RunGenericScenario(eventProcessorHostFirst); + var runResult1 = await RunGenericScenario(eventProcessorHostFirst); // Consume all messages with second host. // Create host with 'secondhost' prefix. @@ -269,12 +286,12 @@ async Task WithBlobPrefix() this.StorageConnectionString, leaseContainerName, "secondhost"); - var setOfMessages2 = await RunGenericScenario(eventProcessorHostSecond, totalNumberOfEventsToSend: 0); + var runResult2 = await RunGenericScenario(eventProcessorHostSecond, totalNumberOfEventsToSend: 0); // Confirm that we are looking at 2 identical sets of messages in the end. - foreach (var kvp in setOfMessages1) + foreach (var kvp in runResult1.ReceivedEvents) { - Assert.True(kvp.Value.Count() == setOfMessages2[kvp.Key].Count, + Assert.True(kvp.Value.Count() == runResult2.ReceivedEvents[kvp.Key].Count, $"The sets of messages returned from first host and the second host are different for partition {kvp.Key}."); } } @@ -531,10 +548,10 @@ async Task InitialOffsetProviderWithDateTime() MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions); + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -563,10 +580,10 @@ async Task InitialOffsetProviderWithOffset() MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions); + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -587,10 +604,10 @@ async Task InitialOffsetProviderWithEndOfStream() MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions); + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -624,10 +641,11 @@ async Task InitialOffsetProviderOverrideBehavior() InitialOffsetProvider = partitionId => PartitionReceiver.StartOfStream, MaxBatchSize = 100 }; - var receivedEvents = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkPointLastEvent: false); + + var runResult = await this.RunGenericScenario(eventProcessorHost, processorOptions, checkpointLastEvent: false); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -653,10 +671,10 @@ async Task CheckpointEventDataShouldHold() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents = await RunGenericScenario(eventProcessorHostSecond); + var runResult = await RunGenericScenario(eventProcessorHostSecond); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -672,7 +690,7 @@ async Task CheckpointBatchShouldHold() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - await RunGenericScenario(eventProcessorHostFirst, checkPointLastEvent: false, checkPointBatch: true); + await RunGenericScenario(eventProcessorHostFirst, checkpointLastEvent: false, checkpointBatch: true); // For the second time we initiate a host and this time it should pick from where the previous host left. // In other words, it shouldn't start receiving from start of the stream. @@ -682,10 +700,10 @@ async Task CheckpointBatchShouldHold() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents = await RunGenericScenario(eventProcessorHostSecond); + var runResult = await RunGenericScenario(eventProcessorHostSecond); // We should have received only 1 event from each partition. - Assert.False(receivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); + Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "One of the partitions didn't return exactly 1 event"); } [Fact] @@ -795,8 +813,8 @@ async Task NoCheckpointThenNewHostReadsFromStart() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents1 = await RunGenericScenario(eventProcessorHostFirst, checkPointLastEvent: false); - var totalEventsFromFirstHost = receivedEvents1.Sum(part => part.Value.Count); + var runResult1 = await RunGenericScenario(eventProcessorHostFirst, checkpointLastEvent: false); + var totalEventsFromFirstHost = runResult1.ReceivedEvents.Sum(part => part.Value.Count); // Second time we initiate a host, it should pick from where previous host left. // In other words, it shouldn't start receiving from start of the stream. @@ -806,12 +824,33 @@ async Task NoCheckpointThenNewHostReadsFromStart() this.EventHubConnectionString, this.StorageConnectionString, leaseContainerName); - var receivedEvents2 = await RunGenericScenario(eventProcessorHostSecond); - var totalEventsFromSecondHost = receivedEvents2.Sum(part => part.Value.Count); + var runResult2 = await RunGenericScenario(eventProcessorHostSecond); + var totalEventsFromSecondHost = runResult2.ReceivedEvents.Sum(part => part.Value.Count); // Second host should have received +partition-count messages. Assert.True(totalEventsFromFirstHost + PartitionIds.Count() == totalEventsFromSecondHost, - $"Second host received {receivedEvents2} events where as first host receive {receivedEvents1} events."); + $"Second host received {totalEventsFromSecondHost} events where as first host receive {totalEventsFromFirstHost} events."); + } + + /// + /// Checkpointing every message received should be Ok. No failures expected. + /// + /// + [Fact] + async Task CheckpointEveryMessageReceived() + { + var eventProcessorHost = new EventProcessorHost( + null, + PartitionReceiver.DefaultConsumerGroupName, + this.EventHubConnectionString, + this.StorageConnectionString, + this.LeaseContainerName); + + var runResult = await RunGenericScenario(eventProcessorHost, totalNumberOfEventsToSend: 10, + checkpointLastEvent: false, checkpoingEveryEvent: true); + + // Validate there were not failures. + Assert.True(runResult.NumberOfFailures == 0, $"RunResult returned with {runResult.NumberOfFailures} failures!"); } async Task>> DiscoverEndOfStream() @@ -828,11 +867,11 @@ async Task>> DiscoverEndOfStream() return partitions.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); } - async Task>> RunGenericScenario(EventProcessorHost eventProcessorHost, - EventProcessorOptions epo = null, int totalNumberOfEventsToSend = 1, bool checkPointLastEvent = true, - bool checkPointBatch = false) + async Task RunGenericScenario(EventProcessorHost eventProcessorHost, + EventProcessorOptions epo = null, int totalNumberOfEventsToSend = 1, bool checkpointLastEvent = true, + bool checkpointBatch = false, bool checkpoingEveryEvent = false) { - var receivedEvents = new ConcurrentDictionary>(); + var runResult = new GenericScenarioResult(); var lastReceivedAt = DateTime.Now; if (epo == null) @@ -858,27 +897,32 @@ async Task>> RunGenericScenario(EventProcesso string hostName = createArgs.Item1.Owner; processor.OnOpen += (_, partitionContext) => Log($"{hostName} > Partition {partitionId} TestEventProcessor opened"); processor.OnClose += (_, closeArgs) => Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}"); - processor.OnProcessError += (_, errorArgs) => Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + processor.OnProcessError += (_, errorArgs) => + { + Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}"); + Interlocked.Increment(ref runResult.NumberOfFailures); + }; processor.OnProcessEvents += (_, eventsArgs) => { int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0; Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)"); if (eventCount > 0) { - List events; - receivedEvents.TryGetValue(partitionId, out events); - if (events == null) + lastReceivedAt = DateTime.Now; + runResult.AddEvents(partitionId, eventsArgs.Item2.events); + + foreach (var e in eventsArgs.Item2.events) { - events = new List(); + // Checkpoint every event received? + if (checkpoingEveryEvent) + { + eventsArgs.Item1.CheckpointAsync(e).Wait(); + } } - - events.AddRange(eventsArgs.Item2.events); - receivedEvents[partitionId] = events; - lastReceivedAt = DateTime.Now; } - eventsArgs.Item2.checkPointLastEvent = checkPointLastEvent; - eventsArgs.Item2.checkPointBatch = checkPointBatch; + eventsArgs.Item2.checkPointLastEvent = checkpointLastEvent; + eventsArgs.Item2.checkPointBatch = checkpointBatch; }; }; @@ -905,10 +949,12 @@ async Task>> RunGenericScenario(EventProcesso await Task.Delay(1000); } - Log("Verifying at least an event was received by each partition"); + Log($"Verifying at least {totalNumberOfEventsToSend} event(s) was received by each partition"); foreach (var partitionId in PartitionIds) { - Assert.True(receivedEvents.ContainsKey(partitionId), $"Partition {partitionId} didn't receive any message!"); + Assert.True(runResult.ReceivedEvents.ContainsKey(partitionId) + && runResult.ReceivedEvents[partitionId].Count >= totalNumberOfEventsToSend, + $"Partition {partitionId} didn't receive expected number of messages. Expected {totalNumberOfEventsToSend}, received {runResult.ReceivedEvents[partitionId].Count}."); } Log("Success"); @@ -919,7 +965,7 @@ async Task>> RunGenericScenario(EventProcesso await eventProcessorHost.UnregisterEventProcessorAsync(); } - return receivedEvents.ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + return runResult; } protected async Task SendToPartitionAsync(string partitionId, string messageBody, string connectionString) @@ -944,5 +990,31 @@ protected void Log(string message) Console.WriteLine(log); } } + + class GenericScenarioResult + { + public ConcurrentDictionary> ReceivedEvents = new ConcurrentDictionary>(); + public int NumberOfFailures = 0; + + object listLock = new object(); + + public void AddEvents(string partitionId, IEnumerable addEvents) + { + List events; + this.ReceivedEvents.TryGetValue(partitionId, out events); + if (events == null) + { + events = new List(); + } + + // Account the case where 2 hosts racing by working on the same partition. + lock (listLock) + { + events.AddRange(addEvents); + } + + this.ReceivedEvents[partitionId] = events; + } + } } diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs index 1e62ac4..ea74511 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/NegativeCases.cs @@ -80,11 +80,11 @@ async Task NonexsistentEntity() Assert.NotNull(ex.InnerException); Assert.IsType(ex.InnerException); } + /// /// While processing events one event causes a failure. Host should be able to recover any error. /// /// - /// [Fact] async Task HostShouldRecoverWhenProcessEventsAsyncThrows() { diff --git a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs index 72ea6df..d019951 100644 --- a/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs +++ b/test/Microsoft.Azure.EventHubs.Processor.UnitTests/TestEventProcessor.cs @@ -48,11 +48,12 @@ Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable