Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] GeoDR - OffsetString #47133

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/ApiListing.exclude-attributes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ T:System.Runtime.CompilerServices.AsyncStateMachineAttribute
T:System.Runtime.CompilerServices.CompilerGeneratedAttribute
T:System.Runtime.CompilerServices.NullableContextAttribute
T:System.Runtime.CompilerServices.NullableAttribute
T:System.Runtime.CompilerServices.IsReadOnlyAttribute
jsquire marked this conversation as resolved.
Show resolved Hide resolved
T:Azure.Core.CodeGenSuppressAttribute
T:Azure.Core.CodeGenModelAttribute
T:Azure.Core.CodeGenMemberAttribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down Expand Up @@ -71,6 +73,8 @@ public BlobCheckpointStore(Azure.Storage.Blobs.BlobContainerClient blobContainer
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public EventProcessorClient(Azure.Storage.Blobs.BlobContainerClient checkpointSt
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
protected override System.Threading.Tasks.Task UpdateCheckpointAsync(string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
protected override System.Threading.Tasks.Task ValidateProcessingPreconditions(System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down Expand Up @@ -71,6 +73,8 @@ public BlobCheckpointStore(Azure.Storage.Blobs.BlobContainerClient blobContainer
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ClaimOwnershipAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership> desiredOwnership, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<Azure.Messaging.EventHubs.Primitives.EventProcessorCheckpoint> GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task<System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.Primitives.EventProcessorPartitionOwnership>> ListOwnershipAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, System.Threading.CancellationToken cancellationToken) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
[System.ObsoleteAttribute("The Event Hubs service does not guarantee a numeric offset for all resource configurations. Checkpoints created from a numeric offset may not work in all cases going forward. Please use a string-based offset via the overload accepting 'CheckpointPosition' instead.", false)]
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, long offset, long? sequenceNumber, System.Threading.CancellationToken cancellationToken) { throw null; }
public override System.Threading.Tasks.Task UpdateCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, string clientIdentifier, Azure.Messaging.EventHubs.Processor.CheckpointPosition startingPosition, System.Threading.CancellationToken cancellationToken) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ EventData eventData = EventHubsModelFactory.EventData(
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offsetString: "1500:1:3344.1",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

// This creates a new instance of ProcessEventArgs to pass into the handler directly.
Expand Down Expand Up @@ -110,7 +110,7 @@ TimerCallback dispatchEvent = async _ =>
systemProperties: new Dictionary<string, object>(), //arbitrary value
partitionKey: "sample-key",
sequenceNumber: 1000,
offset: 1500,
offsetString: "1500:1:1111",
enqueuedTime: DateTimeOffset.Parse("11:36 PM"));

ProcessEventArgs eventArgs = new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Description>Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/</Description>
<Version>5.12.0-beta.2</Version>
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually.-->
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually. -->
<ApiCompatVersion>5.11.5</ApiCompatVersion>
<PackageTags>Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
Expand All @@ -16,7 +16,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<!-- TEMP TEMP -->
<ProjectReference Include="../../Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj" />
<!-- PackageReference Include="Azure.Messaging.EventHubs" /-->
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.Amqp" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ partial void InvalidCheckpointFound(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with the checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with the checkpoint.</param>
/// <param name="offset">The offset associated with the checkpoint.</param>
/// <param name="exception">The exception that occurred.</param>
///
Expand All @@ -107,10 +106,9 @@ partial void UpdateCheckpointError(string partitionId,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset,
Exception exception) =>
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, exception.Message, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to update a checkpoint has completed.
Expand All @@ -122,7 +120,6 @@ partial void UpdateCheckpointError(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointComplete(string partitionId,
Expand All @@ -131,9 +128,8 @@ partial void UpdateCheckpointComplete(string partitionId,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to create/update a checkpoint has started.
Expand All @@ -145,7 +141,6 @@ partial void UpdateCheckpointComplete(string partitionId,
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="clientIdentifier">The unique identifier of the client that authored this checkpoint.</param>
/// <param name="sequenceNumber">The sequence number associated with this checkpoint.</param>
/// <param name="replicationSegment">The replication segment associated with this checkpoint.</param>
/// <param name="offset">The offset associated with this checkpoint.</param>
///
partial void UpdateCheckpointStart(string partitionId,
Expand All @@ -154,9 +149,8 @@ partial void UpdateCheckpointStart(string partitionId,
string consumerGroup,
string clientIdentifier,
string sequenceNumber,
string replicationSegment,
string offset) =>
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, replicationSegment, offset);
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, clientIdentifier, sequenceNumber, offset);

/// <summary>
/// Indicates that an attempt to retrieve claim partition ownership has completed.
Expand Down
Loading
Loading