Skip to content

Commit

Permalink
feat(scaler): support cloudID and apiKey in elasticsearch scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Rogerio Peixoto <rcbpeixoto@gmail.com>
  • Loading branch information
rcbop authored and Rogerio Peixoto committed Nov 20, 2022
1 parent f8ea5dd commit d8b43fd
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **AWS Scalers**: Add setting AWS endpoint url. ([#3337](https://github.com/kedacore/keda/issues/3337))
- **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
- **Azure Service Bus Scaler:** Support regex usage in queueName / subscriptionName parameters. ([#1624](https://github.com/kedacore/keda/issues/1624))
- **ElasticSearch Scaler**: Support for ElasticSearch Service on Elastic Cloud ([#3785]https://github.com/kedacore/keda/issues/3785)
- **Selenium Grid Scaler:** Allow setting url trigger parameter from TriggerAuthentication/ClusterTriggerAuthentication ([#3752](https://github.com/kedacore/keda/pull/3752))

### Improvements
Expand Down
118 changes: 98 additions & 20 deletions pkg/scalers/elasticsearch_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type elasticsearchMetadata struct {
unsafeSsl bool
username string
password string
cloudID string
apiKey string
indexes []string
searchTemplateName string
parameters []string
Expand Down Expand Up @@ -70,25 +72,36 @@ func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) {

const defaultUnsafeSsl = false

func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}
func hasCloudConfig(meta *elasticsearchMetadata) bool {
if meta.cloudID != "" {
return true
}
if meta.apiKey != "" {
return true
}
return false
}

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return nil, err
func hasEndpointsConfig(meta *elasticsearchMetadata) bool {
if len(meta.addresses) > 0 {
return true
}
meta.addresses = splitAndTrimBySep(addresses, ",")
if meta.username != "" {
return true
}
if meta.password != "" {
return true
}
return false
}

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
meta.unsafeSsl, err = strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err)
}
} else {
meta.unsafeSsl = defaultUnsafeSsl
func extractEndpointsConfig(config *ScalerConfig, meta *elasticsearchMetadata) error {
addresses, err := GetFromAuthOrMeta(config, "addresses")
if err != nil {
return err
}

meta.addresses = splitAndTrimBySep(addresses, ",")
if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok {
Expand All @@ -101,6 +114,60 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

return nil
}

func extractCloudConfig(config *ScalerConfig, meta *elasticsearchMetadata) error {
cloudID, err := GetFromAuthOrMeta(config, "cloudID")
if err != nil {
return err
}
meta.cloudID = cloudID

apiKey, err := GetFromAuthOrMeta(config, "apiKey")
if err != nil {
return err
}
meta.apiKey = apiKey
return nil
}

func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) {
meta := elasticsearchMetadata{}

var err error
addresses, err := GetFromAuthOrMeta(config, "addresses")
cloudID, errCloudConfig := GetFromAuthOrMeta(config, "cloudID")
if err != nil && errCloudConfig != nil {
return nil, fmt.Errorf("must provide either endpoint addresses or cloud config")
}

if err == nil && addresses != "" {
err = extractEndpointsConfig(config, &meta)
if err != nil {
return nil, err
}
}
if errCloudConfig == nil && cloudID != "" {
err = extractCloudConfig(config, &meta)
if err != nil {
return nil, err
}
}

if hasEndpointsConfig(&meta) && hasCloudConfig(&meta) {
return nil, fmt.Errorf("can't provide endpoint addresses and cloud config at the same time")
}

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
meta.unsafeSsl, err = strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("error parsing unsafeSsl: %s", err)
}
} else {
meta.unsafeSsl = defaultUnsafeSsl
}

index, err := GetFromAuthOrMeta(config, "index")
if err != nil {
return nil, err
Expand Down Expand Up @@ -144,12 +211,23 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e

// newElasticsearchClient creates elasticsearch db connection
func newElasticsearchClient(meta *elasticsearchMetadata, logger logr.Logger) (*elasticsearch.Client, error) {
config := elasticsearch.Config{Addresses: meta.addresses}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
var config elasticsearch.Config

if hasCloudConfig(meta) {
config = elasticsearch.Config{
CloudID: meta.cloudID,
APIKey: meta.apiKey,
}
} else {
config = elasticsearch.Config{
Addresses: meta.addresses,
}
if meta.username != "" {
config.Username = meta.username
}
if meta.password != "" {
config.Password = meta.password
}
}

transport := http.DefaultTransport.(*http.Transport)
Expand Down
20 changes: 18 additions & 2 deletions pkg/scalers/elasticsearch_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,22 @@ type elasticsearchMetricIdentifier struct {

var testCases = []parseElasticsearchMetadataTestData{
{
name: "no addresses given",
name: "must provide either endpoint addresses or cloud config",
metadata: map[string]string{},
authParams: map[string]string{},
expectedError: errors.New("no addresses given"),
expectedError: errors.New("must provide either endpoint addresses or cloud config"),
},
{
name: "no apiKey given",
metadata: map[string]string{"cloudID": "my-cluster:xxxxxxxxxxx"},
authParams: map[string]string{},
expectedError: errors.New("no apiKey given"),
},
{
name: "can't provide endpoint addresses and cloud config at the same time",
metadata: map[string]string{"addresses": "http://localhost:9200", "cloudID": "my-cluster:xxxxxxxxxxx"},
authParams: map[string]string{"username": "admin", "apiKey": "xxxxxxxxx"},
expectedError: errors.New("can't provide endpoint addresses and cloud config at the same time"),
},
{
name: "no index given",
Expand Down Expand Up @@ -447,6 +459,10 @@ func TestElasticsearchGetMetricSpecForScaling(t *testing.T) {
AuthParams: testData.metadataTestData.authParams,
ScalerIndex: testData.scalerIndex,
})
if testData.metadataTestData.expectedError != nil {
assert.Equal(t, err, testData.metadataTestData.expectedError)
continue
}
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
Expand Down

0 comments on commit d8b43fd

Please sign in to comment.