Skip to content

Commit

Permalink
language agnostic checkpointing for azure eventhub scaler (#1621)
Browse files Browse the repository at this point in the history
  • Loading branch information
christle authored May 5, 2021
1 parent 24dc0a8 commit c2ad43e
Show file tree
Hide file tree
Showing 7 changed files with 619 additions and 127 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Fixed goroutine leaks in usage of timers ([#1704](https://github.com/kedacore/keda/pull/1704) | [#1739](https://github.com/kedacore/keda/pull/1739))
- Setting timeouts in the HTTP client used by the IBM MQ scaler ([#1758](https://github.com/kedacore/keda/pull/1758))
- Fix cleanup of removed triggers ([#1768](https://github.com/kedacore/keda/pull/1768))
- Eventhub Scaler: Add trigger parameter `checkpointStrategy` to support more language-specific checkpoints ([#1621](https://github.com/kedacore/keda/pull/1621))

### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/go-logr/logr v0.4.0
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-openapi/spec v0.20.3
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/mock v1.5.0
Expand Down
119 changes: 18 additions & 101 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,15 @@
package azure

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"

"github.com/imdario/mergo"

"github.com/Azure/azure-amqp-common-go/v3/aad"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/azure"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/util"
)

type baseCheckpoint struct {
Epoch int64 `json:"Epoch"`
Offset string `json:"Offset"`
Owner string `json:"Owner"`
Token string `json:"Token"`
}

// Checkpoint is the object eventhub processor stores in storage
// for checkpointing event processors. This matches the object
// stored by the eventhub C# sdk and Java sdk
type Checkpoint struct {
baseCheckpoint
PartitionID string `json:"PartitionId"`
SequenceNumber int64 `json:"SequenceNumber"`
}

// Eventhub python sdk stores the checkpoint differently
type pythonCheckpoint struct {
baseCheckpoint
PartitionID string `json:"partition_id"`
SequenceNumber int64 `json:"sequence_number"`
}

// EventHubInfo to keep event hub connection and resources
type EventHubInfo struct {
EventHubConnection string
Expand All @@ -51,6 +18,7 @@ type EventHubInfo struct {
BlobContainer string
Namespace string
EventHubName string
CheckpointStrategy string
}

// GetEventHubClient returns eventhub client
Expand Down Expand Up @@ -80,74 +48,6 @@ func GetEventHubClient(info EventHubInfo) (*eventhub.Hub, error) {
return nil, aadErr
}

// GetCheckpointFromBlobStorage accesses Blob storage and gets checkpoint information of a partition
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {
blobCreds, storageEndpoint, err := ParseAzureStorageBlobConnection(httpClient, kedav1alpha1.PodIdentityProviderNone, info.StorageConnection, "")
if err != nil {
return Checkpoint{}, err
}

var eventHubNamespace string
var eventHubName string
if info.EventHubConnection != "" {
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return Checkpoint{}, err
}
} else {
eventHubNamespace = info.Namespace
eventHubName = info.EventHubName
}

// TODO: add more ways to read from different types of storage and read checkpoints/leases written in different JSON formats
var baseURL *url.URL
// Checking blob store for C# and Java applications
if info.BlobContainer != "" {
// URL format - <storageEndpoint>/<blobContainer>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/%s/%s/%s", info.BlobContainer, info.EventHubConsumerGroup, partitionID))
baseURL = storageEndpoint.ResolveReference(path)
} else {
// Checking blob store for Azure functions
// URL format - <storageEndpoint>/azure-webjobs-eventhub/<eventHubNamespace>/<eventHubName>/<eventHubConsumerGroup>/<partitionID>
path, _ := url.Parse(fmt.Sprintf("/azure-webjobs-eventhub/%s/%s/%s/%s", eventHubNamespace, eventHubName, info.EventHubConsumerGroup, partitionID))
baseURL = storageEndpoint.ResolveReference(path)
}

// Create a BlockBlobURL object to a blob in the container.
blobURL := azblob.NewBlockBlobURL(*baseURL, azblob.NewPipeline(blobCreds, azblob.PipelineOptions{}))

get, err := blobURL.Download(ctx, 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if err != nil {
return Checkpoint{}, fmt.Errorf("unable to download file from blob storage: %w", err)
}

blobData := &bytes.Buffer{}
reader := get.Body(azblob.RetryReaderOptions{})
if _, err := blobData.ReadFrom(reader); err != nil {
return Checkpoint{}, fmt.Errorf("failed to read blob data: %s", err)
}
defer reader.Close() // The client must close the response body when finished with it

return getCheckpoint(blobData.Bytes())
}

func getCheckpoint(bytes []byte) (Checkpoint, error) {
var checkpoint Checkpoint
var pyCheckpoint pythonCheckpoint

if err := json.Unmarshal(bytes, &checkpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

if err := json.Unmarshal(bytes, &pyCheckpoint); err != nil {
return Checkpoint{}, fmt.Errorf("failed to decode blob data: %s", err)
}

err := mergo.Merge(&checkpoint, Checkpoint(pyCheckpoint))

return checkpoint, err
}

// ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name)
// Connection string should be in following format:
// Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name
Expand Down Expand Up @@ -177,3 +77,20 @@ func ParseAzureEventHubConnectionString(connectionString string) (string, string

return eventHubNamespace, eventHubName, nil
}

func getHubAndNamespace(info EventHubInfo) (string, string, error) {
var eventHubNamespace string
var eventHubName string
var err error
if info.EventHubConnection != "" {
eventHubNamespace, eventHubName, err = ParseAzureEventHubConnectionString(info.EventHubConnection)
if err != nil {
return "", "", err
}
} else {
eventHubNamespace = info.Namespace
eventHubName = info.EventHubName
}

return eventHubNamespace, eventHubName, nil
}
Loading

0 comments on commit c2ad43e

Please sign in to comment.