From 06ce04a7b44645ba0d78a31cd8ec7d6368600116 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Thu, 29 Apr 2021 08:29:22 +0200 Subject: [PATCH] refactor eventhub checkpointStrategy selection Signed-off-by: Christian Leinweber --- .../azure/azure_eventhub_checkpoint.go | 13 +++--- pkg/scalers/azure/azure_eventhub_test.go | 41 +++++-------------- 2 files changed, 18 insertions(+), 36 deletions(-) diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index 432b3b80819..0b71e866fe3 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -7,6 +7,7 @@ import ( "fmt" "net/url" "strconv" + "strings" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/imdario/mergo" @@ -76,22 +77,23 @@ func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, } func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { - if info.CheckpointStrategy == "GoSdk" { + switch { + case (info.CheckpointStrategy == "GoSdk"): return &goSdkCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - } else if info.CheckpointStrategy == "BlobMetadata" { + case (info.CheckpointStrategy == "BlobMetadata"): return &blobMetadataCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, } - } else if info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == "" { + case (info.CheckpointStrategy == "AzureWebJob" || info.BlobContainer == ""): return &azureWebjobCheckpointer{ containerName: "azure-webjobs-eventhub", partitionID: partitionID, } - } else { + default: return &defaultCheckpointer{ containerName: info.BlobContainer, partitionID: partitionID, @@ -129,8 +131,7 @@ func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*u return nil, err } - path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) - + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, strings.ToLower(info.EventHubConsumerGroup), checkpointer.partitionID)) return path, nil } diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index 8950dd7c499..6d3c93ebd68 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -24,7 +24,7 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { partitionID := "0" offset := "1001" - consumerGroup := "$Default" + consumerGroup := "$Default1" sequencenumber := int64(1) @@ -62,9 +62,9 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) { return } - partitionID := "0" + partitionID := "1" offset := "1005" - consumerGroup := "$Default" + consumerGroup := "$Default2" sequencenumber := int64(1) @@ -92,7 +92,7 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) { BlobContainer: containerName, } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -103,9 +103,9 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T return } - partitionID := "0" + partitionID := "2" offset := "1006" - consumerGroup := "$Default" + consumerGroup := "$Default3" sequencenumber := int64(1) @@ -133,7 +133,7 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T BlobContainer: containerName, } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -144,7 +144,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { return } - partitionID := "0" + partitionID := "4" offset := "1002" consumerGroup := "$default" @@ -178,7 +178,7 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { CheckpointStrategy: "BlobMetadata", } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -219,7 +219,7 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { CheckpointStrategy: "GoSdk", } - check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) _ = check.Offset _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) @@ -263,20 +263,6 @@ func TestShouldParseCheckpointForDefault(t *testing.T) { assert.Equal(t, url.Path, "/DefaultContainer/$Default/0") } -func TestShouldParseCheckpointForBlobMetadataWithCheckpointStrategy(t *testing.T) { - eventHubInfo := EventHubInfo{ - EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", - EventHubConsumerGroup: "$Default", - CheckpointStrategy: "BlobMetadata", - BlobContainer: "containername", - } - - cp := newCheckpointer(eventHubInfo, "0") - url, _ := cp.resolvePath(eventHubInfo) - - assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") -} - func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", @@ -288,7 +274,7 @@ func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { cp := newCheckpointer(eventHubInfo, "0") url, _ := cp.resolvePath(eventHubInfo) - assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") + assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$default/checkpoint/0") } func TestShouldParseCheckpointForGoSdk(t *testing.T) { @@ -305,12 +291,7 @@ func TestShouldParseCheckpointForGoSdk(t *testing.T) { assert.Equal(t, url.Path, "/containername/0") } -func Test(t *testing.T) { - -} - func createNewCheckpointInStorage(urlPath string, containerName string, partitionID string, checkpoint string, metadata map[string]string) (context.Context, error) { - credential, endpoint, _ := ParseAzureStorageBlobConnection(http.DefaultClient, "none", StorageConnectionString, "") // Create container