Skip to content

Commit

Permalink
add default checkpointer for backward compatibility to azure eventhub…
Browse files Browse the repository at this point in the history
… scaler

Signed-off-by: Christian Leinweber <christian.leinweber@maibornwolff.de>
  • Loading branch information
christle committed Apr 11, 2021
1 parent 92e6439 commit 19c38ed
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 37 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/go-logr/logr v0.4.0
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-openapi/spec v0.20.3
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/mock v1.5.0
Expand Down
112 changes: 84 additions & 28 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/imdario/mergo"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/util"
)
Expand All @@ -22,18 +23,27 @@ type goCheckpoint struct {
PartitionID string `json:"partitionId"`
}

// Checkpoint is the object eventhub processor stores in storage
// for checkpointing event processors. This matches the object
// stored by the eventhub C# sdk and Java sdk
type baseCheckpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
}

// Checkpoint in a common format
type Checkpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
baseCheckpoint
PartitionID string `json:"PartitionId"`
SequenceNumber int64 `json:"SequenceNumber"`
}

// Older python sdk stores the checkpoint differently
type pythonCheckpoint struct {
baseCheckpoint
PartitionID string `json:"partition_id"`
SequenceNumber int64 `json:"sequence_number"`
}

type checkpointer interface {
resolvePath(info EventHubInfo) (*url.URL, error)
extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error)
Expand All @@ -44,7 +54,7 @@ type azureWebjobCheckpointer struct {
containerName string
}

type defaultCheckpointer struct {
type blobMetadataCheckpointer struct {
partitionID string
containerName string
}
Expand All @@ -54,9 +64,13 @@ type goSdkCheckpointer struct {
containerName string
}

type defaultCheckpointer struct {
partitionID string
containerName string
}

// GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {

checkpointer := newCheckpointer(info, partitionID)
return getCheckpoint(ctx, httpClient, info, checkpointer)
}
Expand All @@ -67,6 +81,11 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
containerName: info.BlobContainer,
partitionID: partitionID,
}
} else if info.CheckpointType == "BlobMetadata" {
return &blobMetadataCheckpointer{
containerName: info.BlobContainer,
partitionID: partitionID,
}
} else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" {
return &azureWebjobCheckpointer{
containerName: "azure-webjobs-eventhub",
Expand All @@ -80,63 +99,100 @@ func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
}
}

// resolve path for AzureWebJobCheckpointer
func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
if err != nil {
return nil, err
}

// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))

return path, nil
}

func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
// extract checkpoint for AzureWebJobCheckpointer
func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint Checkpoint
err := readToCheckpointFromBody(get, &checkpoint)
if err != nil {
return Checkpoint{}, err
}

return checkpoint, nil
}

// resolve path for BlobMetadataCheckpointer
func (checkpointer *blobMetadataCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
if err != nil {
return nil, err
}

// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))

return path, nil
}

// Resolve Path for AzureWebJob Checkpoint
// extract checkpoint for BlobMetadataCheckpointer
func (checkpointer *blobMetadataCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
return getCheckpointFromStorageMetadata(get, checkpointer.partitionID)
}

// resolve path for GoSdkCheckpointer
func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID))

return path, nil
}

func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint Checkpoint
// extract checkpoint for GoSdkCheckpointer
func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint goCheckpoint
err := readToCheckpointFromBody(get, &checkpoint)
if err != nil {
return Checkpoint{}, err
}

return checkpoint, nil
return Checkpoint{
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
baseCheckpoint: baseCheckpoint{
Offset: checkpoint.Checkpoint.Offset,
},
PartitionID: checkpoint.PartitionID,
}, nil
}

func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
return getCheckpointFromStorageMetadata(get, checkpointer.partitionID)
// resolve path for DefaultCheckpointer
func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, checkpointer.partitionID))

return path, nil
}

func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint goCheckpoint
err := readToCheckpointFromBody(get, &checkpoint)
if err != nil {
return Checkpoint{}, err
// extract checkpoint with deprecated Python sdk checkpoint for backward compatibility
func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint Checkpoint
var pyCheckpoint pythonCheckpoint
blobData := &bytes.Buffer{}

reader := get.Body(azblob.RetryReaderOptions{})
if _, err := blobData.ReadFrom(reader); err != nil {
return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err)
}
defer reader.Close() // The client must close the response body when finished with it

return Checkpoint{
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
Offset: checkpoint.Checkpoint.Offset,
PartitionID: checkpoint.PartitionID,
}, nil
if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

if err := json.Unmarshal(blobData.Bytes(), &pyCheckpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint))

return checkpoint, err
}

func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) {
Expand Down
120 changes: 111 additions & 9 deletions pkg/scalers/azure/azure_eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) {
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
Offset: offset,
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -55,7 +57,89 @@ func TestCheckpointFromBlobStorageAzureWebjob(t *testing.T) {
assert.Equal(t, check, expectedCheckpoint)
}

func TestCheckpointFromBlobStorageWithDefault(t *testing.T) {
func TestCheckpointFromBlobStorageDefault(t *testing.T) {
if StorageConnectionString == "" {
return
}

partitionID := "0"
offset := "1005"
consumerGroup := "$Default"

sequencenumber := int64(1)

containerName := "defaultcontainer"
checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}

eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub",
StorageConnection: StorageConnectionString,
EventHubConsumerGroup: consumerGroup,
EventHubName: "hub",
BlobContainer: containerName,
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T) {
if StorageConnectionString == "" {
return
}

partitionID := "0"
offset := "1006"
consumerGroup := "$Default"

sequencenumber := int64(1)

containerName := "defaultcontainerpython"
checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}"
checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID)
urlPath := fmt.Sprintf("%s/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}

eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub",
StorageConnection: StorageConnectionString,
EventHubConsumerGroup: consumerGroup,
EventHubName: "hub",
BlobContainer: containerName,
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
_ = check.Offset
_ = expectedCheckpoint.Offset
assert.Equal(t, check, expectedCheckpoint)
}

func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) {
if StorageConnectionString == "" {
return
}
Expand All @@ -71,14 +155,16 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) {
"sequencenumber": strconv.FormatInt(sequencenumber, 10),
}

containerName := "defaultcontainer"
containerName := "blobmetadatacontainer"
urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/checkpoint/", consumerGroup)

ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, "", metadata)
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
Offset: offset,
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand All @@ -89,6 +175,7 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) {
EventHubConsumerGroup: consumerGroup,
EventHubName: "hub",
BlobContainer: containerName,
CheckpointType: "BlobMetadata",
}

check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0")
Expand All @@ -98,7 +185,6 @@ func TestCheckpointFromBlobStorageWithDefault(t *testing.T) {
}

func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {

if StorageConnectionString == "" {
return
}
Expand All @@ -118,7 +204,9 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) {
assert.Equal(t, err, nil)

expectedCheckpoint := Checkpoint{
Offset: offset,
baseCheckpoint: baseCheckpoint{
Offset: offset,
},
PartitionID: partitionID,
SequenceNumber: sequencenumber,
}
Expand Down Expand Up @@ -162,11 +250,24 @@ func TestShouldParseCheckpointForWebJobWithCheckpointType(t *testing.T) {
assert.Equal(t, url.Path, "/azure-webjobs-eventhub/eventhubnamespace.servicebus.windows.net/hub-test/$Default/0")
}

func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) {
func TestShouldParseCheckpointForDefault(t *testing.T) {
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
EventHubConsumerGroup: "$Default",
BlobContainer: "DefaultContainer",
}

cp := newCheckpointer(eventHubInfo, "0")
url, _ := cp.resolvePath(eventHubInfo)

assert.Equal(t, url.Path, "/DefaultContainer/$Default/0")
}

func TestShouldParseCheckpointForBlobMetadataWithCheckpointType(t *testing.T) {
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
EventHubConsumerGroup: "$Default",
CheckpointType: "Default",
CheckpointType: "BlobMetadata",
BlobContainer: "containername",
}

Expand All @@ -176,11 +277,12 @@ func TestShouldParseCheckpointForDefaultWithCheckpointType(t *testing.T) {
assert.Equal(t, url.Path, "/containername/eventhubnamespace.servicebus.windows.net/hub-test/$Default/checkpoint/0")
}

func TestShouldParseCheckpointForDefault(t *testing.T) {
func TestShouldParseCheckpointForBlobMetadata(t *testing.T) {
eventHubInfo := EventHubInfo{
EventHubConnection: "Endpoint=sb://eventhubnamespace.servicebus.windows.net/;EntityPath=hub-test",
EventHubConsumerGroup: "$Default",
BlobContainer: "containername",
CheckpointType: "BlobMetadata",
}

cp := newCheckpointer(eventHubInfo, "0")
Expand Down

0 comments on commit 19c38ed

Please sign in to comment.