Skip to content

Commit

Permalink
Reduce the number of storage calls in lease manager (#357)
Browse files Browse the repository at this point in the history
* Couple improvements in Azure Lease Manager to reduce numberof storage calls.

* N/A as partition id

* Go with default timeout

* Moving to most recent AMQP release

* Fix flaky EPH test

* Adding 30 seconds default operation timeout back to tests.

* Reducing EPH to storage IO calls.

* Couple more fixes

* .

* Set token for owned leases.

* Refresh lease before acquiring in processor host.

* Fix metada removal order during lease release.

* Update lease token only for already running pumps to avoid resetting receiver position data.

* FetchAttributesAsync of blob as part of GetAllLeasesAsync() call.

* Refresh lease before attempting to steal

* Don't retry if we already lost the lease during receiver open.

* Don't attempt to steal if owner has changed from the calculation time to refresh time.

* -

* Partition pump to close when hit ReceiverDisconnectedException since this is not recoverable.

* -

* Ignore any failure during releasing the lease

* Don't update pump token if token is empty

* Nullify the owner on the lease in case this host lost it.

* Increment ourLeaseCount when a lease is acquired.

* Correcting task list

* No need to assign pump lease token to downloaded lease.

* comment update

* comment update

* Clear ownership on partial acquisition.

* Clear ownership on partial acquisition.

* Make sure we don't leave the lease as owned if acquisition failed.

* Adding logs to debug lease corruption bug

* Adding logs to debug lease corruption bug

* Small fix at steal lease check

* Protect subject iterator variable during task creation in for loops.

* .

* Renew lease right after ChangeLease call

* Don't create pump if partition expired or already moved to some other host.

* Use refreshed lease while creating partition pump.

* Remove temporary debug logs.

* Addressing SJ's comments

* Remove obsolete
  • Loading branch information
serkantkaraca authored Feb 11, 2019
1 parent 1c4c4c7 commit cec369a
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 190 deletions.
22 changes: 16 additions & 6 deletions src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace Microsoft.Azure.EventHubs.Processor

class AzureBlobLease : Lease
{
readonly bool isOwned;

// ctor needed for deserialization
internal AzureBlobLease()
{
Expand All @@ -17,32 +19,40 @@ internal AzureBlobLease()
internal AzureBlobLease(string partitionId, CloudBlockBlob blob) : base(partitionId)
{
this.Blob = blob;
}
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
}

internal AzureBlobLease(string partitionId, string owner, CloudBlockBlob blob) : base(partitionId)
{
this.Blob = blob;
this.Owner = owner;
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
}

internal AzureBlobLease(AzureBlobLease source)
internal AzureBlobLease(AzureBlobLease source)
: base(source)
{
this.Offset = source.Offset;
this.SequenceNumber = source.SequenceNumber;
this.Blob = source.Blob;
this.isOwned = source.isOwned;
}

internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(source)
{
this.Offset = source.Offset;
this.SequenceNumber = source.SequenceNumber;
this.Blob = blob;
this.isOwned = blob.Properties.LeaseState == LeaseState.Leased;
}

// do not serialize
[JsonIgnore]
public CloudBlockBlob Blob { get; }

public override async Task<bool> IsExpired()
public override Task<bool> IsExpired()
{
await this.Blob.FetchAttributesAsync().ConfigureAwait(false); // Get the latest metadata
var currentState = this.Blob.Properties.LeaseState;
return currentState != LeaseState.Leased;
return Task.FromResult(!this.isOwned);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Microsoft.Azure.EventHubs.Processor
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs.Primitives;
using Newtonsoft.Json;
Expand All @@ -13,6 +14,8 @@ namespace Microsoft.Azure.EventHubs.Processor

class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseManager
{
static string MetaDataOwnerName = "OWNINGHOST";

EventProcessorHost host;
TimeSpan leaseDuration;
TimeSpan leaseRenewInterval;
Expand Down Expand Up @@ -107,7 +110,9 @@ public Task<bool> CheckpointStoreExistsAsync()

public Task<bool> CreateCheckpointStoreIfNotExistsAsync()
{
return CreateLeaseStoreIfNotExistsAsync();
// Because we control the caller, we know that this method will only be called after createLeaseStoreIfNotExists.
// In this implementation, it's the same store, so the store will always exist if execution reaches here.
return Task.FromResult(true);
}

public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
Expand All @@ -126,19 +131,10 @@ public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
return checkpoint;
}

[Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
public Task UpdateCheckpointAsync(Checkpoint checkpoint)
{
throw new NotImplementedException();
}

public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
public Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
{
// Normally the lease will already be created, checkpoint store is initialized after lease store.
AzureBlobLease lease = (AzureBlobLease)await CreateLeaseIfNotExistsAsync(partitionId).ConfigureAwait(false);
Checkpoint checkpoint = new Checkpoint(partitionId, lease.Offset, lease.SequenceNumber);

return checkpoint;
return Task.FromResult<Checkpoint>(null);
}

public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint)
Expand Down Expand Up @@ -228,26 +224,45 @@ public async Task<bool> DeleteLeaseStoreAsync()

public async Task<Lease> GetLeaseAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
{
AzureBlobLease retval = null;

CloudBlockBlob leaseBlob = GetBlockBlobReference(partitionId);

if (await leaseBlob.ExistsAsync(null, this.operationContext).ConfigureAwait(false))
{
retval = await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false);
}
await leaseBlob.FetchAttributesAsync().ConfigureAwait(false);

return retval;
return await DownloadLeaseAsync(partitionId, leaseBlob).ConfigureAwait(false);
}

public IEnumerable<Task<Lease>> GetAllLeases()
public async Task<IEnumerable<Lease>> GetAllLeasesAsync()
{
IEnumerable<string> partitionIds = this.host.PartitionManager.GetPartitionIdsAsync().WaitAndUnwrapException();
var leaseList = new List<Lease>();
BlobContinuationToken continuationToken = null;

foreach (string id in partitionIds)
do
{
yield return GetLeaseAsync(id);
}
var leaseBlobsResult = await this.consumerGroupDirectory.ListBlobsSegmentedAsync(
true,
BlobListingDetails.Metadata,
null,
continuationToken,
null,
this.operationContext);

foreach (CloudBlockBlob leaseBlob in leaseBlobsResult.Results)
{
// Try getting owner name from existing blob.
// This might return null when run on the existing lease after SDK upgrade.
leaseBlob.Metadata.TryGetValue(MetaDataOwnerName, out var owner);

// Discover partition id from URI path of the blob.
var partitionId = leaseBlob.Uri.AbsolutePath.Split('/').Last();

leaseList.Add(new AzureBlobLease(partitionId, owner, leaseBlob));
}

continuationToken = leaseBlobsResult.ContinuationToken;

} while (continuationToken != null);

return leaseList;
}

public async Task<Lease> CreateLeaseIfNotExistsAsync(string partitionId) // throws URISyntaxException, IOException, StorageException
Expand Down Expand Up @@ -315,6 +330,7 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
string partitionId = lease.PartitionId;
try
{
bool renewLease = false;
string newToken;
await leaseBlob.FetchAttributesAsync(null, null, this.operationContext).ConfigureAwait(false);
if (leaseBlob.Properties.LeaseState == LeaseState.Leased)
Expand All @@ -332,6 +348,7 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
}

ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to ChangeLease");
renewLease = true;
newToken = await leaseBlob.ChangeLeaseAsync(
newLeaseId,
AccessCondition.GenerateLeaseCondition(lease.Token),
Expand All @@ -341,29 +358,38 @@ async Task<bool> AcquireLeaseCoreAsync(AzureBlobLease lease)
else
{
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.HostName, lease.PartitionId, "Need to AcquireLease");

try
{
newToken = await leaseBlob.AcquireLeaseAsync(leaseDuration, newLeaseId, null, null, this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
when (se.RequestInformation != null
&& se.RequestInformation.ErrorCode.Equals(BlobErrorCodeStrings.LeaseAlreadyPresent, StringComparison.OrdinalIgnoreCase))
{
// Either some other host grabbed the lease or checkpoint call renewed it.
return false;
}
newToken = await leaseBlob.AcquireLeaseAsync(
leaseDuration,
newLeaseId,
null,
null,
this.operationContext).ConfigureAwait(false);
}

lease.Token = newToken;
lease.Owner = this.host.HostName;
lease.IncrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host

// Renew lease here if needed?
// ChangeLease doesn't renew so we should avoid lease expiring before next renew interval.
if (renewLease)
{
await this.RenewLeaseCoreAsync(lease).ConfigureAwait(false);
}

await leaseBlob.UploadTextAsync(
JsonConvert.SerializeObject(lease),
null,
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);

// Update owner in the metadata.
lease.Blob.Metadata[MetaDataOwnerName] = lease.Owner;
await lease.Blob.SetMetadataAsync(
AccessCondition.GenerateLeaseCondition(lease.Token),
null,
this.operationContext).ConfigureAwait(false);
}
catch (StorageException se)
{
Expand Down Expand Up @@ -418,6 +444,14 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
Token = string.Empty,
Owner = string.Empty
};

// Remove owner in the metadata.
leaseBlob.Metadata.Remove(MetaDataOwnerName);
await leaseBlob.SetMetadataAsync(
AccessCondition.GenerateLeaseCondition(leaseId),
null,
this.operationContext);

await leaseBlob.UploadTextAsync(
JsonConvert.SerializeObject(releasedCopy),
null,
Expand Down Expand Up @@ -478,7 +512,7 @@ await leaseBlob.UploadTextAsync(
return true;
}

async Task<AzureBlobLease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException
async Task<Lease> DownloadLeaseAsync(string partitionId, CloudBlockBlob blob) // throws StorageException, IOException
{
string jsonLease = await blob.DownloadTextAsync().ConfigureAwait(false);

Expand Down Expand Up @@ -513,15 +547,7 @@ Exception HandleStorageException(string partitionId, StorageException se)

CloudBlockBlob GetBlockBlobReference(string partitionId)
{
CloudBlockBlob leaseBlob = this.consumerGroupDirectory.GetBlockBlobReference(partitionId);

// Fixed, keeping workaround commented until full validation.
// GetBlockBlobReference creates a new ServiceClient thus resets options.
// Because of this we lose settings like MaximumExecutionTime on the client.
// Until storage addresses the issue we need to override it here once more.
// Tracking bug: https://github.com/Azure/azure-storage-net/issues/398
// leaseBlob.ServiceClient.DefaultRequestOptions = this.storageClient.DefaultRequestOptions;
return leaseBlob;
return this.consumerGroupDirectory.GetBlockBlobReference(partitionId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ protected override async Task OnOpenAsync()
lastException = e;
ProcessorEventSource.Log.PartitionPumpWarning(
this.Host.HostName, this.PartitionContext.PartitionId, "Failure creating client or receiver, retrying", e.ToString());

// Don't retry if we already lost the lease.
if (e is ReceiverDisconnectedException)
{
break;
}

retryCount++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ namespace Microsoft.Azure.EventHubs.Processor
{
internal static class EventProcessorHostActionStrings
{
internal static readonly string DownloadingLeases = "Downloading Leases";
internal static readonly string CheckingLeases = "Checking Leases";
internal static readonly string RenewingLease = "Renewing Lease";
internal static readonly string ReleasingLease = "Releasing Lease";
internal static readonly string StealingLease = "Stealing Lease";
internal static readonly string CreatingLease = "Creating Lease";
internal static readonly string ClosingEventProcessor = "Closing Event Processor";
Expand Down
7 changes: 0 additions & 7 deletions src/Microsoft.Azure.EventHubs.Processor/ICheckpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ public interface ICheckpointManager
/// <returns>Checkpoint info for the given partition, or null if none has been previously stored.</returns>
Task<Checkpoint> GetCheckpointAsync(string partitionId);

/// <summary>
/// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
/// </summary>
/// <param name="checkpoint">offset/sequeceNumber to update the store with.</param>
[System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
Task UpdateCheckpointAsync(Checkpoint checkpoint);

/// <summary>
/// Create the checkpoint for the given partition if it doesn't exist. Do nothing if it does exist.
/// The offset/sequenceNumber for a freshly-created checkpoint should be set to StartOfStream/0.
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.Azure.EventHubs.Processor/ILeaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public interface ILeaseManager
/// A typical implementation could just call GetLeaseAsync() on all partitions.
/// </summary>
/// <returns>list of lease info.</returns>
IEnumerable<Task<Lease>> GetAllLeases();
Task<IEnumerable<Lease>> GetAllLeasesAsync();

/// <summary>
/// Create in the store the lease info for the given partition, if it does not exist. Do nothing if it does exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="WindowsAzure.Storage" Version="9.2.0" />
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net461' ">
Expand Down
Loading

0 comments on commit cec369a

Please sign in to comment.