Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New IBM MQ Scaler #1254

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/**
* © Copyright IBM Corporation 2020
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IBM Copyright shouldn't be here.

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package scalers

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"

kedautil "github.com/kedacore/keda/pkg/util"
)

// Default variables and settings
const (
ibmMqQueueDepthMetricName = "currentQueueDepth"
defaultTargetQueueDepth = 20
defaultTlsDisabled = false
)

// Assigns IBMMQMetadata struct data pointer to metadata variable
type IBMMQScaler struct {
metadata *IBMMQMetadata
}

// Metadata used by KEDA to query IBM MQ queue depth and scale
type IBMMQMetadata struct {
host string
queueName string
username string
password string
targetQueueDepth int
tlsDisabled bool
}

// Full structured response from MQ admin REST query
type CommandResponse struct {
CommandResponse []Response `json:"commandResponse"`
}

// The body of the response returned from the MQ admin query
type Response struct {
Parameters Parameters `json:"parameters"`
}

// Current depth of the IBM MQ Queue
type Parameters struct {
Curdepth int `json:"curdepth"`
}

// NewIBMMQScaler creates a new IBM MQ scaler
func NewIBMMQScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseIBMMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing IBM MQ metadata: %s", err)
}

return &IBMMQScaler{metadata: meta}, nil
}

func (s *IBMMQScaler) Close() error {
return nil
}

// parseIBMMQMetadata checks the existence of and validates the MQ connection data provided
func parseIBMMQMetadata(config *ScalerConfig) (*IBMMQMetadata, error) {
meta := IBMMQMetadata{}

if val, ok := config.TriggerMetadata["host"]; ok {
_, err := url.ParseRequestURI(val)
if err != nil {
return nil, fmt.Errorf("invalid URL: %s", err)
}
meta.host = val
} else {
return nil, fmt.Errorf("no host URI given")
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
}

if val, ok := config.TriggerMetadata["queueDepth"]; ok && val != "" {
queueDepth, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("invalid targetQueueDepth - must be an integer")
} else {
meta.targetQueueDepth = queueDepth
}
} else {
fmt.Println("No target depth defined - setting default")
meta.targetQueueDepth = defaultTargetQueueDepth
}

if val, ok := config.TriggerMetadata["tls"]; ok {
tlsDisabled, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("invalid tls setting: %s", err)
}
meta.tlsDisabled = tlsDisabled
} else {
fmt.Println("No tls setting defined - setting default")
meta.tlsDisabled = defaultTlsDisabled
}

if val, ok := config.AuthParams["username"]; ok {
meta.username = val
} else {
return nil, fmt.Errorf("no username given")
}

if val, ok := config.AuthParams["password"]; ok {
meta.password = val
} else {
return nil, fmt.Errorf("no password given")
}

return &meta, nil
}

// IsActive returns true if there are messages to be processed/if we need to scale from zero
func (s *IBMMQScaler) IsActive(ctx context.Context) (bool, error) {
queueDepth, err := s.getQueueDepthViaHttp()
if err != nil {
return false, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err)
}
return queueDepth > 0, nil
}

// getQueueDepthViaHttp returns the depth of the MQ Queue from the Admin endpoint
func (s *IBMMQScaler) getQueueDepthViaHttp() (int, error) {

queue := s.metadata.queueName
url := s.metadata.host

var requestJson = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestJson))
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(s.metadata.username, s.metadata.password)

tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: s.metadata.tlsDisabled},
}
client := &http.Client{Transport: tr}

resp, err := client.Do(req)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should just return the error rather than panicing the entire process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ahmelsayed, we've included a potential fix for this in PR #1259

}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
Copy link
Contributor

@ahmelsayed ahmelsayed Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: handle ioutil read error in case the body stream got closed in the interim

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ahmelsayed, we've included a potential fix for this in PR #1259


var response CommandResponse
json.Unmarshal(body, &response)
Copy link
Contributor

@ahmelsayed ahmelsayed Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: handle Unmarshal returned error in case body is not a valid JSON

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ahmelsayed, we've included a potential fix for this in PR #1259


return response.CommandResponse[0].Parameters.Curdepth, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might panic if CommandResponse was empty, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ahmelsayed, we've included a potential fix for this in PR #1259

}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *IBMMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueDepth), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s", "IBMMQ", s.metadata.queueName)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric name should be unique, that means that if you specify two or more IBM MQ scalers in one ScaledObject, each scaler should have a different metric name. I wonder if this is enough?
Is viable a usecase, where we have two MQ scalers with the same queueName but pointing to the different MQ (server)?

},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueueLengthQty,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *IBMMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queueDepth, err := s.getQueueDepthViaHttp()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting IBM MQ queue depth: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: ibmMqQueueDepthMetricName,
Value: *resource.NewQuantity(int64(queueDepth), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
121 changes: 121 additions & 0 deletions pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* © Copyright IBM Corporation 2020
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dtto

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package scalers

import (
"fmt"
"testing"
)

// Test host URLs for validation
const (
testValidMQQueueURL = "https://qmtest.qm2.eu-gb.mq.appdomain.cloud/ibmmq/rest/v2/admin/action/qmgr/QM1/mqsc"
testInvalidMQQueueURL = "testInvalidURL.com"
)

// Test data struct used for TestIBMMQParseMetadata
type parseIBMMQMetadataTestData struct {
metadata map[string]string
isError bool
authParams map[string]string
}

// Test metric identifier with test MQ data and it's name
type IBMMQMetricIdentifier struct {
metadataTestData *parseIBMMQMetadataTestData
name string
}

// Setting metric identifier mock name
var IBMMQMetricIdentifiers = []IBMMQMetricIdentifier{
{&testIBMMQMetadata[1], "IBMMQ-testQueue"},
}

// Test cases for TestIBMMQParseMetadata test
var testIBMMQMetadata = []parseIBMMQMetadataTestData{
// Nothing passed
{map[string]string{}, true, map[string]string{}},
// Properly formed metadata
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid queueDepth using a string
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// No host provided
{map[string]string{"queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Missing queueName
{map[string]string{"host": testValidMQQueueURL, "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid URL
{map[string]string{"host": testInvalidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed authParams
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// No username provided
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"password": "Pass123"}},
// No password provided
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername"}},
}

// Test MQ Connection metadata is parsed correctly
// should error on missing required field
func TestIBMMQParseMetadata(t *testing.T) {
for _, testData := range testIBMMQMetadata {
_, err := parseIBMMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
fmt.Println(testData)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
fmt.Println(testData)
}
}
}

// Test case for TestParseDefaultQueueDepth test
var testDefaultQueueDepth = []parseIBMMQMetadataTestData{
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
}

// Test that DefaultQueueDepth is set when targetQueueDepth is not provided
func TestParseDefaultQueueDepth(t *testing.T) {
for _, testData := range testDefaultQueueDepth {
metadata, err := parseIBMMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
} else if testData.isError && err == nil {
t.Error("Expected error but got success")
} else if metadata.targetQueueDepth != defaultTargetQueueDepth {
t.Error("Expected default queueDepth =", defaultTargetQueueDepth, "but got", metadata.targetQueueDepth)
}
}
}

// Create a scaler and check if metrics method is available
func TestIBMMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range IBMMQMetricIdentifiers {
metadata, err := parseIBMMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams})

if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockIBMMQScaler := IBMMQScaler{metadata}
metricSpec := mockIBMMQScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name

if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewPubSubScaler(config)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(config)
case "ibmmq":
return scalers.NewIBMMQScaler(config)
case "kafka":
return scalers.NewKafkaScaler(config)
case "liiklus":
Expand Down