Skip to content

Commit

Permalink
Add HTTPScaler
Browse files Browse the repository at this point in the history
Closes: kedacore#929

Signed-off-by: Tomek Urbaszek <tomasz.urbaszek@polidea.com>
  • Loading branch information
turbaszek committed Aug 22, 2020
1 parent 9341259 commit d4c806e
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ func (h *ScaleHandler) getScaler(name, namespace, triggerType string, resolvedEn
return scalers.NewRedisStreamsScaler(resolvedEnv, triggerMetadata, authParams)
case "artemis-queue":
return scalers.NewArtemisQueueScaler(resolvedEnv, triggerMetadata, authParams)
case "http":
return scalers.NewHTTPScaler(resolvedEnv, triggerMetadata, authParams)
default:
return nil, fmt.Errorf("no scaler found for type: %s", triggerType)
}
Expand Down
149 changes: 149 additions & 0 deletions pkg/scalers/http_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package scalers

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"k8s.io/api/autoscaling/v2beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
"net/http"
"net/url"
"path"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"strconv"
)

const (
httpMetricName = "HTTPMetricValue"
)

type httpScaler struct {
metadata *httpScalerMetadata
}

type httpScalerMetadata struct {
targetValue int
apiURL *url.URL
metricName string
}

type metric struct {
Name string `json:"name"`
Value float64 `json:"value"`
}

var httpLog = logf.Log.WithName("http_scaler")

// NewHTTPScaler creates a new HTTP scaler
func NewHTTPScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) {
meta, err := parseHTTPMetadata(resolvedEnv, metadata, authParams)
if err != nil {
return nil, fmt.Errorf("error parsing HTTP metadata: %s", err)
}
return &httpScaler{metadata: meta}, nil
}

func parseHTTPMetadata(resolvedEnv, metadata, authParams map[string]string) (*httpScalerMetadata, error) {
meta := httpScalerMetadata{}

if val, ok := metadata["targetValue"]; ok {
targetValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("targetValue parsing error %s", err.Error())
}
meta.targetValue = targetValue
} else {
return nil, fmt.Errorf("no targetValue given in metadata")
}

if val, ok := metadata["apiURL"]; ok {
apiURL, err := url.Parse(val)
if err != nil {
return nil, fmt.Errorf("apiURL parsing error %s", err.Error())
}
meta.apiURL = apiURL
} else {
return nil, fmt.Errorf("no apiURL given in metadata")
}

if val, ok := metadata["metricName"]; ok {
meta.metricName = val
} else {
return nil, fmt.Errorf("no metricName given in metadata")
}

return &meta, nil
}

func (s *httpScaler) checkHealth() error {
u := path.Join(s.metadata.apiURL.RawPath, "health")
_, err := http.Get(u)
return err
}

func (s *httpScaler) getMetricInfo() (*metric, error) {
var m *metric
u := path.Join(s.metadata.apiURL.RawPath, "metrics", s.metadata.metricName)
r, err := http.Get(u)
if err != nil {
return nil, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &m)
if err != nil {
return nil, err
}
return m, nil
}

// Close does nothing in case of httpScaler
func (s *httpScaler) Close() error {
return nil
}

// IsActive returns true if there are pending messages to be processed
func (s *httpScaler) IsActive(ctx context.Context) (bool, error) {
err := s.checkHealth()
if err != nil {
httpLog.Error(err, fmt.Sprintf("Error when checking API health: %s", err))
return false, err
}
return true, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *httpScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI)
externalMetric := &v2beta1.ExternalMetricSource{
MetricName: httpMetricName,
TargetAverageValue: targetQueryValue,
}
metricSpec := v2beta1.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta1.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *httpScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
m, err := s.getMetricInfo()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error requesting metrics endpoint: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(m.Value), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

0 comments on commit d4c806e

Please sign in to comment.