Skip to content

Commit

Permalink
Add custom http timeout in RabbitMQ Scaler (#2086)
Browse files Browse the repository at this point in the history
Signed-off-by: jorturfer <jorge_turrado@hotmail.es>
  • Loading branch information
Jorge Turrado Ferrero authored Sep 10, 2021
1 parent a6e3d38 commit 64a9b0b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Add Bearer auth for Metrics API scaler ([#2028](https://github.com/kedacore/keda/pull/2028))
- Anonymize the host in case of HTTP failure (RabbitMQ Scaler) ([#2041](https://github.com/kedacore/keda/pull/2041))
- Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055))
- Add custom http timeout in RabbitMQ Scaler ([#2086](https://github.com/kedacore/keda/pull/2086))

### Breaking Changes

Expand Down
37 changes: 28 additions & 9 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"regexp"
"strconv"
"time"

"github.com/streadway/amqp"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -60,14 +61,15 @@ type rabbitMQScaler struct {

type rabbitMQMetadata struct {
queueName string
mode string // QueueLength or MessageRate
value int // trigger value (queue length or publish/sec. rate)
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
operation string // specify the operation to apply in case of multiples queues
metricName string // Custom metric name for trigger
mode string // QueueLength or MessageRate
value int // trigger value (queue length or publish/sec. rate)
host string // connection string for either HTTP or AMQP protocol
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
operation string // specify the operation to apply in case of multiples queues
metricName string // custom metric name for trigger
timeout time.Duration // custom http timeout for a specific trigger
}

type queueInfo struct {
Expand All @@ -93,11 +95,11 @@ var rabbitmqLog = logf.Log.WithName("rabbitmq_scaler")

// NewRabbitMQScaler creates a new rabbitMQ scaler
func NewRabbitMQScaler(config *ScalerConfig) (Scaler, error) {
httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout)
meta, err := parseRabbitMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing rabbitmq metadata: %s", err)
}
httpClient := kedautil.CreateHTTPClient(meta.timeout)

if meta.protocol == httpProtocol {
return &rabbitMQScaler{
Expand Down Expand Up @@ -220,6 +222,23 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
}
}

// Resolve timeout
if val, ok := config.TriggerMetadata["timeout"]; ok {
timeoutMS, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("unable to parse timeout: %s", err)
}
if meta.protocol == amqpProtocol {
return nil, fmt.Errorf("amqp protocol doesn't support custom timeouts: %s", err)
}
if timeoutMS <= 0 {
return nil, fmt.Errorf("timeout must be greater than 0: %s", err)
}
meta.timeout = time.Duration(timeoutMS) * time.Millisecond
} else {
meta.timeout = config.GlobalHTTPTimeout
}

return &meta, nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true"}, false, map[string]string{}},
// custom metric name
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "metricName": "host1-sample"}, false, map[string]string{}},
// http valid timeout
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "1000"}, false, map[string]string{}},
// http invalid timeout
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "-10"}, true, map[string]string{}},
// http wrong timeout
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "error"}, true, map[string]string{}},
// amqp timeout
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "timeout": "10"}, true, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down

0 comments on commit 64a9b0b

Please sign in to comment.