Skip to content

Commit

Permalink
language agnostic checkpointing for azure eventhub scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Leinweber <christian.leinweber@maibornwolff.de>
  • Loading branch information
christle committed Feb 17, 2021
1 parent dfcbba9 commit b86d821
Show file tree
Hide file tree
Showing 6 changed files with 481 additions and 127 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/go-logr/logr v0.4.0
github.com/go-logr/zapr v0.3.0 // indirect
github.com/go-openapi/spec v0.20.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/mock v1.4.4
Expand Down
119 changes: 18 additions & 101 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,15 @@
package azure

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"

"github.com/imdario/mergo"

"github.com/Azure/azure-amqp-common-go/v3/aad"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/util"
)

type baseCheckpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
}

// Checkpoint is the object eventhub processor stores in storage
// for checkpointing event processors. This matches the object
// stored by the eventhub C# sdk and Java sdk
type Checkpoint struct {
baseCheckpoint
PartitionID string `json:"PartitionId"`
SequenceNumber int64 `json:"SequenceNumber"`
}

// Eventhub python sdk stores the checkpoint differently
type pythonCheckpoint struct {
baseCheckpoint
PartitionID string `json:"partition_id"`
SequenceNumber int64 `json:"sequence_number"`
}

// EventHubInfo to keep event hub connection and resources
type EventHubInfo struct {
EventHubConnection string
Expand All @@ -51,6 +18,7 @@ type EventHubInfo struct {
BlobContainer string
Namespace string
EventHubName string
CheckpointType string
}

// GetEventHubClient returns eventhub client
Expand Down Expand Up @@ -80,74 +48,6 @@ func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) {
return nil, aadErr
}

// GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "")
if err != nil {
return Checkpoint{}, err
}

var eventHubNamespace string
var eventHubName string
if info.EventHubConnection != "" {
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return Checkpoint{}, err
}
} else {
eventHubNamespace = info.Namespace
eventHubName = info.EventHubName
}

// TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats
var baseURL *url.URL
// Checking blob store for C# and Java applications
if info.BlobContainer != "" {
// URL format - <storageEndpoint>/<blobContainer>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, partitionID))
baseURL = storageEndpoint.ResolveReference(path)
} else {
// Checking blob store for Azure functions
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/azure-webjobs-eventhub/%s/%s/%s/%s", eventHubNamespace, eventHubName, info.EventHubConsumerGroup, partitionID))
baseURL = storageEndpoint.ResolveReference(path)
}

// Create a BlockBlobURL object to a blob in the container.
blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{}))

get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if err != nil {
return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err)
}

blobData := &bytes.Buffer{}
reader := get.Body(azblob.RetryReaderOptions{})
if _, err := blobData.ReadFrom(reader); err != nil {
return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err)
}
defer reader.Close() // The client must close the response body when finished with it

return getCheckpoint(blobData.Bytes())
}

func getCheckpoint(bytes []byte) (Checkpoint, error) {
var checkpoint Checkpoint
var pyCheckpoint pythonCheckpoint

if err := json.Unmarshal(bytes, &checkpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

if err := json.Unmarshal(bytes, &pyCheckpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint))

return checkpoint, err
}

// ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name)
// Connection string should be in following format:
// Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name
Expand Down Expand Up @@ -177,3 +77,20 @@ func ParseAzureEventHubConnectionString(connectionString string) (string, string

return eventHubNamespace, eventHubName, nil
}

func getHubAndNamespace(info EventHubInfo) (string, string, error) {
var eventHubNamespace string
var eventHubName string
var err error
if info.EventHubConnection != "" {
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return "", "", err
}
} else {
eventHubNamespace = info.Namespace
eventHubName = info.EventHubName
}

return eventHubNamespace, eventHubName, nil
}
220 changes: 220 additions & 0 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package azure

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/url"
"strconv"

"github.com/Azure/azure-storage-blob-go/azblob"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/util"
)

// goCheckpoint struct to adapt GoSdk Checkpoint
type goCheckpoint struct {
Checkpoint struct {
SequenceNumber int64 `json:"sequenceNumber"`
Offset string `json:"offset"`
} `json:"checkpoint"`
PartitionID string `json:"partitionId"`
}

// Checkpoint is the object eventhub processor stores in storage
// for checkpointing event processors. This matches the object
// stored by the eventhub C# sdk and Java sdk
type Checkpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
PartitionID string `json:"PartitionId"`
SequenceNumber int64 `json:"SequenceNumber"`
}

type checkpointer interface {
resolvePath(info EventHubInfo) (*url.URL, error)
extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error)
}

type azureWebjobCheckpointer struct {
partitionID string
containerName string
}

type defaultCheckpointer struct {
partitionID string
containerName string
}

type goSdkCheckpointer struct {
partitionID string
containerName string
}

// GetCheckpointFromBlobStorage reads depending of the CheckpointType the checkpoint from a azure storage
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {

checkpointer := newCheckpointer(info, partitionID)
return getCheckpoint(ctx, httpClient, info, checkpointer)
}

func newCheckpointer(info EventHubInfo, partitionID string) checkpointer {
if info.CheckpointType == "GoSdk" {
return &goSdkCheckpointer{
containerName: info.BlobContainer,
partitionID: partitionID,
}
} else if info.CheckpointType == "AzureWebJob" || info.BlobContainer == "" {
return &azureWebjobCheckpointer{
containerName: "azure-webjobs-eventhub",
partitionID: partitionID,
}
} else {
return &defaultCheckpointer{
containerName: info.BlobContainer,
partitionID: partitionID,
}
}
}

func (checkpointer *azureWebjobCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
if err != nil {
return nil, err
}

// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))

return path, nil
}

func (checkpointer *defaultCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
eventHubNamespace, eventHubName, err := getHubAndNamespace(info)
if err != nil {
return nil, err
}

// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s/%s/checkpoint/%s", checkpointer.containerName, eventHubNamespace, eventHubName, info.EventHubConsumerGroup, checkpointer.partitionID))

return path, nil
}

// Resolve Path for AzureWebJob Checkpoint
func (checkpointer *goSdkCheckpointer) resolvePath(info EventHubInfo) (*url.URL, error) {
path, _ := url.Parse(fmt.Sprintf("/%s/%s", info.BlobContainer, checkpointer.partitionID))

return path, nil
}

func (checkpointer *azureWebjobCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint Checkpoint
err := readToCheckpointFromBody(get, &checkpoint)
if err != nil {
return Checkpoint{}, err
}

return checkpoint, nil
}

func (checkpointer *defaultCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
return getCheckpointFromStorageMetadata(get, checkpointer.partitionID)
}

func (checkpointer *goSdkCheckpointer) extractCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) {
var checkpoint goCheckpoint
err := readToCheckpointFromBody(get, &checkpoint)
if err != nil {
return Checkpoint{}, err
}

return Checkpoint{
SequenceNumber: checkpoint.Checkpoint.SequenceNumber,
Offset: checkpoint.Checkpoint.Offset,
PartitionID: checkpoint.PartitionID,
}, nil
}

func getCheckpoint(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, checkpointer checkpointer) (Checkpoint, error) {
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "")
if err != nil {
return Checkpoint{}, err
}

path, err := checkpointer.resolvePath(info)
if err != nil {
return Checkpoint{}, err
}

baseURL := storageEndpoint.ResolveReference(path)

get, err := downloadBlob(ctx, baseURL, blobCreds)
if err != nil {
return Checkpoint{}, err
}

return checkpointer.extractCheckpoint(get)
}

func getCheckpointFromStorageMetadata(get *azblob.DownloadResponse, partitionID string) (Checkpoint, error) {
checkpoint := Checkpoint{
PartitionID: partitionID,
}

metadata := get.NewMetadata()

if sequencenumber, ok := metadata["sequencenumber"]; ok {
if !ok {
if sequencenumber, ok = metadata["Sequencenumber"]; !ok {
return Checkpoint{}, fmt.Errorf("sequencenumber on blob not found")
}
}

if sn, err := strconv.ParseInt(sequencenumber, 10, 64); err == nil {
checkpoint.SequenceNumber = sn
} else {
return Checkpoint{}, fmt.Errorf("sequencenumber is not a valid int64 value: %w", err)
}
}

if offset, ok := metadata["offset"]; ok {
if !ok {
if offset, ok = metadata["Offset"]; !ok {
return Checkpoint{}, fmt.Errorf("offset on blob not found")
}
}
checkpoint.Offset = offset
}

return checkpoint, nil
}

func readToCheckpointFromBody(get *azblob.DownloadResponse, checkpoint interface{}) error {
blobData := &bytes.Buffer{}

reader := get.Body(azblob.RetryReaderOptions{})
if _, err := blobData.ReadFrom(reader); err != nil {
return fmt.Errorf("failed to read blob data: %s", err)
}
defer reader.Close() // The client must close the response body when finished with it

if err := json.Unmarshal(blobData.Bytes(), &checkpoint); err != nil {
return fmt.Errorf("failed to decode blob data: %s", err)
}

return nil
}

func downloadBlob(ctx context.Context, baseURL *url.URL, blobCreds azblob.Credential) (*azblob.DownloadResponse, error) {
blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{}))

get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if err != nil {
return nil, fmt.Errorf("unable to download file from blob storage: %w", err)
}
return get, nil
}
Loading

0 comments on commit b86d821

Please sign in to comment.