diff --git a/go.mod b/go.mod index 747649e1627..9b44a259309 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/go-logr/logr v0.4.0 github.com/go-logr/zapr v0.4.0 // indirect github.com/go-openapi/spec v0.20.3 + github.com/go-playground/assert/v2 v2.0.1 github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.6.0 github.com/golang/mock v1.5.0 diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index b643a43f852..b6033920f2d 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -9,6 +9,7 @@ import ( "strconv" "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/imdario/mergo" kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" "github.com/kedacore/keda/v2/pkg/util" ) @@ -22,18 +23,26 @@ type goCheckpoint struct { PartitionID string `json:"partitionId"` } -// Checkpoint is the object eventhub processor stores in storage -// for checkpointing event processors. This matches the object -// stored by the eventhub C# sdk and Java sdk +type baseCheckpoint struct { + Epoch int64 `json:"Epoch"` + Offset string `json:"Offset"` + Owner string `json:"Owner"` + Token string `json:"Token"` +} + type Checkpoint struct { - Epoch int64 `json:"Epoch"` - Offset string `json:"Offset"` - Owner string `json:"Owner"` - Token string `json:"Token"` + baseCheckpoint PartitionID string `json:"PartitionId"` SequenceNumber int64 `json:"SequenceNumber"` } +// Older python sdk stores the checkpoint differently +type pythonCheckpoint struct { + baseCheckpoint + PartitionID string `json:"partition_id"` + SequenceNumber int64 `json:"sequence_number"` +} + type checkpointer interface { resolvePath(info EventHubInfo) (*url.URL, error) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) @@ -44,7 +53,7 @@ type azureWebjobCheckpointer struct { containerName string } -type defaultCheckpointer struct { +type blobMetadataCheckpointer struct { partitionID string containerName string } @@ -54,6 +63,11 @@ type goSdkCheckpointer struct { containerName string } +type defaultCheckpointer struct { + partitionID string + containerName string +} + // GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { @@ -67,6 +81,11 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { containerName: info.BlobContainer, partitionID: partitionID, } + } else if info.CheckpointType == "BlobMetadata" { + return &blobMetadataCheckpointer{ + containerName: info.BlobContainer, + partitionID: partitionID, + } } else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" { return &azureWebjobCheckpointer{ containerName: "azure-webjobs-eventhub", @@ -80,63 +99,100 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer { } } +// resolve path for AzureWebJobCheckpointer func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { eventHubNamespace, eventHubName, err := getHubAndNamespace(info) if err != nil { return nil, err } - // URL format - /azure-webjobs-eventhub//// path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) return path, nil } -func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { +// extract checkpoint for AzureWebJobCheckpointer +func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint Checkpoint + err := readToCheckpointFromBody(get, &checkpoint) + if err != nil { + return Checkpoint{}, err + } + + return checkpoint, nil +} + +// resolve path for BlobMetadataCheckpointer +func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { eventHubNamespace, eventHubName, err := getHubAndNamespace(info) if err != nil { return nil, err } - // URL format - /azure-webjobs-eventhub//// path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID)) return path, nil } -// Resolve Path for AzureWebJob Checkpoint +// extract checkpoint for BlobMetadataCheckpointer +func (checkpointer *blobMetadataCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + return getCheckpointFromStorageMetadata(get, checkpointer.partitionID) +} + +// resolve path for GoSdkCheckpointer func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID)) return path, nil } -func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { - var checkpoint Checkpoint +// extract checkpoint for GoSdkCheckpointer +func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint goCheckpoint err := readToCheckpointFromBody(get, &checkpoint) if err != nil { return Checkpoint{}, err } - return checkpoint, nil + return Checkpoint{ + SequenceNumber: checkpoint.Checkpoint.SequenceNumber, + baseCheckpoint: baseCheckpoint{ + Offset: checkpoint.Checkpoint.Offset, + }, + PartitionID: checkpoint.PartitionID, + }, nil } -func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { - return getCheckpointFromStorageMetadata(get, checkpointer.partitionID) +// resolve path for DefaultCheckpointer +func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) { + path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, checkpointer.partitionID)) + + return path, nil } -func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { - var checkpoint goCheckpoint - err := readToCheckpointFromBody(get, &checkpoint) - if err != nil { - return Checkpoint{}, err +//extract checkpoint with deprecated Python sdk checkpoint for backward compatibility +func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { + var checkpoint Checkpoint + var pyCheckpoint pythonCheckpoint + blobData := &bytes.Buffer{} + + reader := get.Body(azblob.RetryReaderOptions{}) + if _, err := blobData.ReadFrom(reader); err != nil { + return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err) } + defer reader.Close() // The client must close the response body when finished with it - return Checkpoint{ - SequenceNumber: checkpoint.Checkpoint.SequenceNumber, - Offset: checkpoint.Checkpoint.Offset, - PartitionID: checkpoint.PartitionID, - }, nil + if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil { + return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) + } + + if err := json.Unmarshal(blobData.Bytes(), &pyCheckpoint); err != nil { + return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err) + } + + err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint)) + + return checkpoint, err } func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) { diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index a21a6cd4b05..8f53f3ad23f 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -37,7 +37,9 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - Offset: offset, + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -55,7 +57,89 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) { assert.Equal(t, check, expectedCheckpoint) } -func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { +func TestCheckpointFromBlobStorageDefault(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1005" + consumerGroup := "$Default" + + sequencenumber := int64(1) + + containerName := "defaultcontainer" + checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T) { + if StorageConnectionString == "" { + return + } + + partitionID := "0" + offset := "1006" + consumerGroup := "$Default" + + sequencenumber := int64(1) + + containerName := "defaultcontainerpython" + checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + urlPath := fmt.Sprintf("%s/", consumerGroup) + + ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) + assert.Equal(t, err, nil) + + expectedCheckpoint := Checkpoint{ + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, + PartitionID: partitionID, + SequenceNumber: sequencenumber, + } + + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub", + StorageConnection: StorageConnectionString, + EventHubConsumerGroup: consumerGroup, + EventHubName: "hub", + BlobContainer: containerName, + } + + check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") + _ = check.Offset + _ = expectedCheckpoint.Offset + assert.Equal(t, check, expectedCheckpoint) +} + +func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { if StorageConnectionString == "" { return } @@ -71,14 +155,16 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { "sequencenumber": strconv.FormatInt(sequencenumber, 10), } - containerName := "defaultcontainer" + containerName := "blobmetadatacontainer" urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/checkpoint/", consumerGroup) ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, "", metadata) assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - Offset: offset, + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -89,6 +175,7 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) { EventHubConsumerGroup: consumerGroup, EventHubName: "hub", BlobContainer: containerName, + CheckpointType: "BlobMetadata", } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") @@ -118,7 +205,9 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - Offset: offset, + baseCheckpoint: baseCheckpoint{ + Offset: offset, + }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -162,11 +251,24 @@ func TestShouldParseCheckpointForWebJobWithCheckpointType(t *testing.T) { assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0") } -func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) { +func TestShouldParseCheckpointForDefault(t *testing.T) { + eventHubInfo := EventHubInfo{ + EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", + EventHubConsumerGroup: "$Default", + BlobContainer: "DefaultContainer", + } + + cp := newCheckpointer(eventHubInfo, "0") + url, _ := cp.resolvePath(eventHubInfo) + + assert.Equal(t, url.Path, "/DefaultContainer/$Default/0") +} + +func TestShouldParseCheckpointForBlobMetadataWithCheckpointType(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", - CheckpointType: "Default", + CheckpointType: "BlobMetadata", BlobContainer: "containername", } @@ -176,11 +278,12 @@ func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) { assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0") } -func TestShouldParseCheckpointForDefault(t *testing.T) { +func TestShouldParseCheckpointForBlobMetadata(t *testing.T) { eventHubInfo := EventHubInfo{ EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test", EventHubConsumerGroup: "$Default", BlobContainer: "containername", + CheckpointType: "BlobMetadata", } cp := newCheckpointer(eventHubInfo, "0")