Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EPH lease manager and checkpointing fixes and improvements #117

Merged
merged 20 commits into from
Mar 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4062f5b
Fixing build break introduced with preview check-in.
Jan 25, 2017
1e3efad
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotnet
Feb 1, 2017
e05450c
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotnet
Feb 1, 2017
29b6bd2
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotnet
Feb 2, 2017
347a348
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotnet
Feb 3, 2017
8ab7726
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotnet
Feb 7, 2017
317720d
Here is the list of the changes:
Feb 7, 2017
20068bd
Set PartitionContext's SequenceNumber and Offset only before deliveri…
Feb 8, 2017
aa2e860
CheckpointAsync(EventData eventData) should not allow checkpointing i…
Feb 8, 2017
88c98b0
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotne…
Feb 8, 2017
82b7957
Add unit test to cover checkpointing messages in the middle of a batch.
Feb 9, 2017
08f7d0d
Marking ICheckpointManager.UpdateCheckpointAsync(Checkpoint checkpoin…
Feb 9, 2017
767af99
Fix: ReceiveHandler and PartitionPump should gracefully handle except…
Feb 13, 2017
b97e940
HostShouldRecoverWhenProcessProcessEventsAsyncThrows unit test to unr…
Feb 14, 2017
fa864e8
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotne…
Mar 4, 2017
9ae7420
Cancelling swallow and trace change in ReceiveHandlerProcessErrorAsyn…
Mar 4, 2017
50c50b1
Remove unnecessary ExceptionHandled trace from EventSource.
Mar 4, 2017
10dbc2f
Merge branch 'dev' of https://github.com/Azure/azure-event-hubs-dotne…
Mar 9, 2017
2030a1a
+ Removing Obsolete signature from ICheckpointManager.
Mar 9, 2017
661c306
Adding Obsolete ICheckpointManager.UpdateCheckpointAsync(Checkpoint c…
Mar 13, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/Microsoft.Azure.EventHubs.Processor/AzureBlobLease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.Azure.EventHubs.Processor
class AzureBlobLease : Lease
{
// ctor needed for deserialization

internal AzureBlobLease()
{
}
Expand All @@ -36,14 +35,9 @@ internal AzureBlobLease(AzureBlobLease source, CloudBlockBlob blob) : base(sourc
}

// do not serialize

[JsonIgnore]
public CloudBlockBlob Blob { get; }

public string Offset { get; set; }

public long SequenceNumber { get; set; }

public override async Task<bool> IsExpired()
{
await this.Blob.FetchAttributesAsync().ConfigureAwait(false); // Get the latest metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ sealed class AzureStorageCheckpointLeaseManager : ICheckpointManager, ILeaseMana
static readonly TimeSpan storageMaximumExecutionTime = TimeSpan.FromMinutes(2);
static readonly TimeSpan leaseDuration = TimeSpan.FromSeconds(30);
static readonly TimeSpan leaseRenewInterval = TimeSpan.FromSeconds(10);
readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions();

// Lease renew calls shouldn't wait more than leaseRenewInterval
readonly BlobRequestOptions renewRequestOptions = new BlobRequestOptions()
{
ServerTimeout = leaseRenewInterval,
MaximumExecutionTime = TimeSpan.FromMinutes(1)
};

internal AzureStorageCheckpointLeaseManager(string storageConnectionString, string leaseContainerName, string storageBlobPrefix)
{
Expand Down Expand Up @@ -97,6 +103,12 @@ public async Task<Checkpoint> GetCheckpointAsync(string partitionId)
return checkpoint;
}

[Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
public Task UpdateCheckpointAsync(Checkpoint checkpoint)
{
throw new NotImplementedException();
}

public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionId)
{
// Normally the lease will already be created, checkpoint store is initialized after lease store.
Expand All @@ -106,13 +118,12 @@ public async Task<Checkpoint> CreateCheckpointIfNotExistsAsync(string partitionI
return checkpoint;
}

public async Task UpdateCheckpointAsync(Checkpoint checkpoint)
public async Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint)
{
// Need to fetch the most current lease data so that we can update it correctly.
AzureBlobLease lease = (AzureBlobLease)await GetLeaseAsync(checkpoint.PartitionId).ConfigureAwait(false);
lease.Offset = checkpoint.Offset;
lease.SequenceNumber = checkpoint.SequenceNumber;
await UpdateLeaseAsync(lease).ConfigureAwait(false);
AzureBlobLease newLease = new AzureBlobLease((AzureBlobLease)lease);
newLease.Offset = checkpoint.Offset;
newLease.SequenceNumber = checkpoint.SequenceNumber;
await this.UpdateLeaseAsync(newLease).ConfigureAwait(false);
}

public Task DeleteCheckpointAsync(string partitionId)
Expand Down Expand Up @@ -343,8 +354,8 @@ public Task<bool> RenewLeaseAsync(Lease lease)
async Task<bool> RenewLeaseCoreAsync(AzureBlobLease lease)
{
CloudBlockBlob leaseBlob = lease.Blob;
bool retval = true;
string partitionId = lease.PartitionId;

try
{
await leaseBlob.RenewLeaseAsync(AccessCondition.GenerateLeaseCondition(lease.Token), this.renewRequestOptions, null).ConfigureAwait(false);
Expand All @@ -353,15 +364,13 @@ async Task<bool> RenewLeaseCoreAsync(AzureBlobLease lease)
{
if (WasLeaseLost(partitionId, se))
{
retval = false;
}
else
{
throw;
throw new LeaseLostException(partitionId, se);
}

throw;
}

return retval;
return true;
}

public Task<bool> ReleaseLeaseAsync(Lease lease)
Expand All @@ -374,8 +383,8 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
ProcessorEventSource.Log.AzureStorageManagerInfo(this.host.Id, lease.PartitionId, "Releasing lease");

CloudBlockBlob leaseBlob = lease.Blob;
bool retval = true;
string partitionId = lease.PartitionId;

try
{
string leaseId = lease.Token;
Expand All @@ -391,15 +400,13 @@ async Task<bool> ReleaseLeaseCoreAsync(AzureBlobLease lease)
{
if (WasLeaseLost(partitionId, se))
{
retval = false;
}
else
{
throw;
throw new LeaseLostException(partitionId, se);
}

throw;
}

return retval;
return true;
}

public Task<bool> UpdateLeaseAsync(Lease lease)
Expand All @@ -419,16 +426,13 @@ async Task<bool> UpdateLeaseCoreAsync(AzureBlobLease lease)

string token = lease.Token;
if (string.IsNullOrEmpty(token))
{
return false;
}

// First, renew the lease to make sure the update will go through.
if (!await this.RenewLeaseAsync(lease).ConfigureAwait(false))
{
return false;
}

// First, renew the lease to make sure the update will go through.
await this.RenewLeaseAsync(lease).ConfigureAwait(false);

CloudBlockBlob leaseBlob = lease.Blob;
try
{
Expand Down Expand Up @@ -481,6 +485,7 @@ bool WasLeaseLost(string partitionId, StorageException se)
}
}
}

return retval;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface ICheckpointManager
/// <returns>Checkpoint info for the given partition, or null if none has been previously stored.</returns>
Task<Checkpoint> GetCheckpointAsync(string partitionId);

[System.Obsolete("Use UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint) instead", true)]
Task UpdateCheckpointAsync(Checkpoint checkpoint);

/// <summary>
/// Create the checkpoint for the given partition if it doesn't exist. Do nothing if it does exist.
/// The offset/sequenceNumber for a freshly-created checkpoint should be set to StartOfStream/0.
Expand All @@ -49,8 +52,9 @@ public interface ICheckpointManager
/// <summary>
/// Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
/// </summary>
/// <param name="lease">Partition information against which to perform a checkpoint.</param>
/// <param name="checkpoint">offset/sequeceNumber to update the store with.</param>
Task UpdateCheckpointAsync(Checkpoint checkpoint);
Task UpdateCheckpointAsync(Lease lease, Checkpoint checkpoint);
Copy link
Contributor

@jtaubensee jtaubensee Feb 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a breaking change? #Closed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and no for .NetCore. Yes if anyone implemented their own checkpoint manager but probably not. The same change is also needed for Java. It is more dreadful there.


In reply to: 100396157 [](ancestors = 100396157)

Copy link
Contributor

@jtaubensee jtaubensee Feb 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we deprecate this signature, and add the new one? That will allow us to release this a minor change according to SemVer. #Resolved


/// <summary>
/// Delete the stored checkpoint for the given partition. If there is no stored checkpoint for the
Expand Down
7 changes: 6 additions & 1 deletion src/Microsoft.Azure.EventHubs.Processor/Lease.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ protected Lease(Lease source)
this.Token = source.Token;
}

public string Offset { get; set; }

public long SequenceNumber { get; set; }

public string PartitionId { get; set; }

public string Owner { get; set; }
Expand All @@ -36,7 +40,8 @@ protected Lease(Lease source)

public virtual Task<bool> IsExpired()
{
// this function is meaningless in the base class
// By default lease never expires.
// Deriving class will implement the lease expiry logic.
return Task.FromResult(false);
}

Expand Down
66 changes: 23 additions & 43 deletions src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,51 +44,19 @@ public string Owner

object ThisLock { get; }

/// <summary>
/// Updates the offset/sequenceNumber in the PartitionContext with the values in the received EventData object.
///
/// <para>Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be
/// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber
/// is equal or greater, the new offset will be as well.</para>
/// </summary>
/// <param name="eventData">A received EventData with valid offset and sequenceNumber</param>
/// <exception cref="ArgumentOutOfRangeException">If the sequenceNumber in the provided event is less than the current value</exception>
internal void SetOffsetAndSequenceNumber(EventData eventData)
{
if (eventData == null)
{
throw new ArgumentNullException(nameof(eventData));
}

this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber);
}

/// <summary>
/// Updates the offset/sequenceNumber in the PartitionContext.
///
/// <para>These two values are closely tied and must be updated in an atomic fashion, hence the combined setter.
/// Since offset is a string it cannot be compared easily, but sequenceNumber is checked. The new sequenceNumber must be
/// at least the same as the current value or the entire assignment is aborted. It is assumed that if the new sequenceNumber
/// is equal or greater, the new offset will be as well.</para>
/// </summary>
/// <param name="offset">New offset value</param>
/// <param name="sequenceNumber">New sequenceNumber value </param>
/// <exception cref="ArgumentOutOfRangeException">If the sequenceNumber in the provided event is less than the current value</exception>
void SetOffsetAndSequenceNumber(string offset, long sequenceNumber)
{
lock(this.ThisLock)
lock (this.ThisLock)
{
if (sequenceNumber >= this.SequenceNumber)
{
this.Offset = offset;
this.SequenceNumber = sequenceNumber;
}
else
{
throw new ArgumentOutOfRangeException("offset/sequenceNumber", $"New offset {offset}/{sequenceNumber} is less than previous {this.Offset}/{this.SequenceNumber}");
}
this.Offset = eventData.SystemProperties.Offset;
this.SequenceNumber = eventData.SystemProperties.SequenceNumber;
}
}
}

internal async Task<object> GetInitialOffsetAsync() // throws InterruptedException, ExecutionException
{
Expand Down Expand Up @@ -132,7 +100,6 @@ internal async Task<object> GetInitialOffsetAsync() // throws InterruptedExcepti
/// <summary>
/// Writes the current offset and sequenceNumber to the checkpoint store via the checkpoint manager.
/// </summary>
/// <exception cref="ArgumentOutOfRangeException">If this.sequenceNumber is less than the last checkpointed value</exception>
public Task CheckpointAsync()
{
// Capture the current offset and sequenceNumber. Synchronize to be sure we get a matched pair
Expand All @@ -153,10 +120,21 @@ public Task CheckpointAsync()
/// values to the checkpoint store via the checkpoint manager.
/// </summary>
/// <param name="eventData">A received EventData with valid offset and sequenceNumber</param>
/// <exception cref="ArgumentNullException">If suplied eventData is null</exception>
/// <exception cref="ArgumentOutOfRangeException">If the sequenceNumber is less than the last checkpointed value</exception>
public Task CheckpointAsync(EventData eventData)
{
this.SetOffsetAndSequenceNumber(eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber);
if (eventData == null)
{
throw new ArgumentNullException("eventData");
}

// We have never seen this sequence number yet
if (eventData.SystemProperties.SequenceNumber > this.SequenceNumber)
{
throw new ArgumentOutOfRangeException("eventData.SystemProperties.SequenceNumber");
}

return this.PersistCheckpointAsync(new Checkpoint(this.PartitionId, eventData.SystemProperties.Offset, eventData.SystemProperties.SequenceNumber));
}

Expand All @@ -165,7 +143,7 @@ public override string ToString()
return $"PartitionContext({this.EventHubPath}/{this.ConsumerGroupName}/{this.PartitionId}/{this.SequenceNumber})";
}

async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOfRangeException, InterruptedException, ExecutionException
async Task PersistCheckpointAsync(Checkpoint checkpoint)
{
ProcessorEventSource.Log.PartitionPumpCheckpointStart(this.host.Id, checkpoint.PartitionId, checkpoint.Offset, checkpoint.SequenceNumber);
try
Expand All @@ -175,12 +153,14 @@ async Task PersistCheckpointAsync(Checkpoint checkpoint) // throws ArgumentOutOf
{
if (inStoreCheckpoint == null)
{
inStoreCheckpoint = await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false);
await this.host.CheckpointManager.CreateCheckpointIfNotExistsAsync(checkpoint.PartitionId).ConfigureAwait(false);
}

inStoreCheckpoint.Offset = checkpoint.Offset;
inStoreCheckpoint.SequenceNumber = checkpoint.SequenceNumber;
await this.host.CheckpointManager.UpdateCheckpointAsync(inStoreCheckpoint).ConfigureAwait(false);
await this.host.CheckpointManager.UpdateCheckpointAsync(this.Lease, checkpoint).ContinueWith((obj) =>
{
this.Lease.Offset = checkpoint.Offset;
this.Lease.SequenceNumber = checkpoint.SequenceNumber;
}, TaskContinuationOptions.OnlyOnRanToCompletion).ConfigureAwait(false);
}
else
{
Expand Down
Loading