Skip to content

Commit

Permalink
[azeventhubs,azservicebus] Some API cleanup, renames (#20754)
Browse files Browse the repository at this point in the history
* Adding options to UpdateCheckpoint(), just for future potential expansion
* Make Offset an int64, not a *int64 (it's not optional, it'll always come back with ReceivedEvents)
* Adding more logging into the checkpoint store.
* Point all imports at the production go-amqp
  • Loading branch information
richardpark-msft authored May 8, 2023
1 parent bd3b467 commit 132a01a
Show file tree
Hide file tree
Showing 132 changed files with 150 additions and 22,444 deletions.
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/amqp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package azeventhubs
import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs.
Expand Down
8 changes: 4 additions & 4 deletions sdk/messaging/azeventhubs/checkpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type CheckpointStore interface {
// ListOwnership lists all ownerships.
ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)

// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
UpdateCheckpoint(ctx context.Context, checkpoint Checkpoint, options *UpdateCheckpointOptions) error
// SetCheckpoint updates a specific checkpoint with a sequence and offset.
SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}

// Ownership tracks which consumer owns a particular partition.
Expand Down Expand Up @@ -59,8 +59,8 @@ type ListOwnershipOptions struct {
// For future expansion
}

// UpdateCheckpointOptions contains optional parameters for the UpdateCheckpoint function
type UpdateCheckpointOptions struct {
// SetCheckpointOptions contains optional parameters for the UpdateCheckpoint function
type SetCheckpointOptions struct {
// For future expansion
}

Expand Down
9 changes: 7 additions & 2 deletions sdk/messaging/azeventhubs/checkpoints/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
Expand Down Expand Up @@ -63,6 +64,8 @@ func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []aze
if bloberror.HasCode(err,
bloberror.ConditionNotMet, // updated before we could update it
bloberror.BlobAlreadyExists) { // created before we could create it

log.Writef(azeventhubs.EventConsumer, "[%s] skipping %s because: %s", po.OwnerID, po.PartitionID, err)
continue
}

Expand Down Expand Up @@ -180,10 +183,10 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s
return ownerships, nil
}

// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
// SetCheckpoint updates a specific checkpoint with a sequence and offset.
//
// NOTE: This function doesn't attempt to prevent simultaneous checkpoint updates - ownership is assumed.
func (b *BlobStore) UpdateCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.UpdateCheckpointOptions) error {
func (b *BlobStore) SetCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.SetCheckpointOptions) error {
blobName, err := nameForCheckpointBlob(checkpoint)

if err != nil {
Expand All @@ -199,6 +202,7 @@ func (b *BlobStore) setOwnershipMetadata(ctx context.Context, blobName string, o
blobClient := b.cc.NewBlockBlobClient(blobName)

if ownership.ETag != nil {
log.Writef(azeventhubs.EventConsumer, "[%s] claiming ownership for %s with etag %s", ownership.OwnerID, ownership.PartitionID, string(*ownership.ETag))
setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, &blob.SetMetadataOptions{
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
Expand All @@ -214,6 +218,7 @@ func (b *BlobStore) setOwnershipMetadata(ctx context.Context, blobName string, o
return setMetadataResp.LastModified, *setMetadataResp.ETag, nil
}

log.Writef(azeventhubs.EventConsumer, "[%s] claiming ownership for %s with NO etags", ownership.PartitionID, ownership.OwnerID)
uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{
Metadata: blobMetadata,
AccessConditions: &blob.AccessConditions{
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azeventhubs/checkpoints/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBlobStore_Checkpoints(t *testing.T) {
require.NoError(t, err)
require.Empty(t, checkpoints)

err = store.UpdateCheckpoint(context.Background(), azeventhubs.Checkpoint{
err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
FullyQualifiedNamespace: "ns.servicebus.windows.net",
Expand All @@ -57,7 +57,7 @@ func TestBlobStore_Checkpoints(t *testing.T) {

// There's a code path to allow updating the blob after it's been created but without an etag
// in which case it just updates it.
err = store.UpdateCheckpoint(context.Background(), azeventhubs.Checkpoint{
err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
FullyQualifiedNamespace: "ns.servicebus.windows.net",
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/consumer_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestConsumerClient_Recovery(t *testing.T) {
require.NoError(t, err)
require.EqualValues(t, 1, len(events))

t.Logf("[%s] Received seq:%d, offset:%d", sr.PartitionID, events[0].SequenceNumber, *events[0].Offset)
t.Logf("[%s] Received seq:%d, offset:%d", sr.PartitionID, events[0].SequenceNumber, events[0].Offset)

require.Equal(t, fmt.Sprintf("event 1 for partition %s", sr.PartitionID), string(events[0].Body))
}(i, sr)
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/eh"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// EventData is an event that can be sent, using the ProducerClient, to an Event Hub.
Expand Down Expand Up @@ -52,7 +52,7 @@ type ReceivedEventData struct {
PartitionKey *string

// Offset is the offset of the event.
Offset *int64
Offset int64

// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
Expand Down Expand Up @@ -177,7 +177,7 @@ func updateFromAMQPAnnotations(src *amqp.Message, dest *ReceivedEventData) error
case offsetNumberAnnotation:
if offsetStr, ok := v.(string); ok {
if offset, err := strconv.ParseInt(offsetStr, 10, 64); err == nil {
dest.Offset = &offset
dest.Offset = offset
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/event_data_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// ErrEventDataTooLarge is returned when a message cannot fit into a batch when using the [azeventhubs.EventDataBatch.AddEventData] function.
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/event_data_batch_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/mock"
"github.com/Azure/go-amqp"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/event_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)

Expand All @@ -20,7 +20,7 @@ func TestEventData_Annotations(t *testing.T) {
require.Empty(t, re.Body)
require.Nil(t, re.EnqueuedTime)
require.Equal(t, int64(0), re.SequenceNumber)
require.Nil(t, re.Offset)
require.Zero(t, re.Offset)
require.Nil(t, re.PartitionKey)
})

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestEventData_newReceivedEventData(t *testing.T) {
SystemProperties: map[string]any{
"hello": "world",
},
Offset: to.Ptr[int64](102),
Offset: int64(102),
PartitionKey: to.Ptr("partition key"),
RawAMQPMessage: &AMQPAnnotatedMessage{
Properties: &AMQPAnnotatedMessageProperties{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func Example_migrateCheckpoints() {
newCheckpoint.Offset = &offset
newCheckpoint.SequenceNumber = &oldCheckpoint.Checkpoint.SequenceNumber

if err := newCheckpointStore.UpdateCheckpoint(context.Background(), newCheckpoint, nil); err != nil {
if err := newCheckpointStore.SetCheckpoint(context.Background(), newCheckpoint, nil); err != nil {
panic(err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionCl

// Updates the checkpoint with the latest event received. If processing needs to restart
// it will restart from this point, automatically.
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
Expand All @@ -154,7 +154,7 @@ func shutdownPartitionResources(partitionClient *azeventhubs.ProcessorPartitionC
defer partitionClient.Close(context.TODO())
}

func createClientsForExample(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.ConsumerClient, *checkpoints.BlobStore, error) {
func createClientsForExample(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.ConsumerClient, azeventhubs.CheckpointStore, error) {
// NOTE: the storageContainerName must exist before the checkpoint store can be used.
azBlobContainerClient, err := container.NewClientFromConnectionString(storageConnectionString, storageContainerName, nil)

Expand Down
3 changes: 2 additions & 1 deletion sdk/messaging/azeventhubs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/Azure/go-amqp v1.0.0
github.com/golang/mock v1.6.0
github.com/joho/godotenv v1.4.0
github.com/stretchr/testify v1.7.1
nhooyr.io/websocket v1.8.7
nhooyr.io/websocket v1.8.7
)

require (
Expand Down
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0/go.mod h1:Y3gnVwfaz8h6L1YHar+NfWORtBoVUSB5h4GlGkdeF7Q=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag=
github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA=
github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_InMemoryCheckpointStore_Checkpoints(t *testing.T) {
require.Empty(t, checkpoints)

for i := int64(0); i < 5; i++ {
err = store.UpdateCheckpoint(context.Background(), Checkpoint{
err = store.SetCheckpoint(context.Background(), Checkpoint{
FullyQualifiedNamespace: "ns",
EventHubName: "eh",
ConsumerGroup: "cg",
Expand Down Expand Up @@ -269,7 +269,7 @@ func (cps *testCheckpointStore) ListOwnership(ctx context.Context, fullyQualifie
return ownerships, nil
}

func (cps *testCheckpointStore) UpdateCheckpoint(ctx context.Context, checkpoint Checkpoint, options *UpdateCheckpointOptions) error {
func (cps *testCheckpointStore) SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error {
cps.checkpointsMu.Lock()
defer cps.checkpointsMu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/amqp_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"context"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

type FakeNSForPartClient struct {
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/amqpwrap/amqpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"errors"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// AMQPReceiver is implemented by *amqp.Receiver
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/amqpwrap/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package amqpwrap
import (
"context"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

// RPCResponse is the simplified response structure from an RPC like call
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/cbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/auth"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/cbs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/auth"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/mock"
"github.com/Azure/go-amqp"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (inf *processorStressTest) receiveForever(ctx context.Context, partClient *

if len(events) > 0 {
// we're okay, let's update our checkpoint
if err := partClient.UpdateCheckpoint(ctx, events[len(events)-1]); err != nil {
if err := partClient.UpdateCheckpoint(ctx, events[len(events)-1], nil); err != nil {
logger("Fatal error updating checkpoint: %s", err)
inf.TC.TrackException(err)
panic(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func initCheckpointStore(ctx context.Context, containerName string, testData *st
newCheckpoint.SequenceNumber = &partProps.LastEnqueuedSequenceNumber
}

if err = cps.UpdateCheckpoint(ctx, newCheckpoint, nil); err != nil {
if err = cps.SetCheckpoint(ctx, newCheckpoint, nil); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
)

type errNonRetriable struct {
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)

Expand Down
22 changes: 0 additions & 22 deletions sdk/messaging/azeventhubs/internal/go-amqp/LICENSE

This file was deleted.

Loading

0 comments on commit 132a01a

Please sign in to comment.