Skip to content

Commit

Permalink
fix conflicts Signed-off-by: alex60217101990 <alex6021710@gmail.com>
Browse files Browse the repository at this point in the history
  • Loading branch information
alex60217101990 committed Jan 10, 2022
1 parent e986a81 commit 1ba8a63
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 46 deletions.
113 changes: 67 additions & 46 deletions pkg/scalers/predictkube_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@ package scalers

import (
"context"
"crypto/tls"
"errors"
"fmt"
"math"
"strconv"
"time"

libs "github.com/dysnix/predictkube-libs/external/configs"
pc "github.com/dysnix/predictkube-libs/external/grpc/client"
"github.com/dysnix/predictkube-libs/external/http_transport"
tc "github.com/dysnix/predictkube-libs/external/types_convertation"
"github.com/dysnix/predictkube-proto/external/proto/commonproto"
pb "github.com/dysnix/predictkube-proto/external/proto/services"
validator "github.com/go-playground/validator/v10"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/go-playground/validator/v10"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
str2duration "github.com/xhit/go-str2duration/v2"
"github.com/xhit/go-str2duration/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"k8s.io/api/autoscaling/v2beta2"
Expand All @@ -29,17 +25,53 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

libs "github.com/dysnix/predictkube-libs/external/configs"
pc "github.com/dysnix/predictkube-libs/external/grpc/client"
"github.com/dysnix/predictkube-libs/external/http_transport"
tc "github.com/dysnix/predictkube-libs/external/types_convertation"
"github.com/dysnix/predictkube-proto/external/proto/commonproto"
pb "github.com/dysnix/predictkube-proto/external/proto/services"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
predictKubeMetricType = "External"

invalidMetricTypeErr = "metric type is invalid"
)

var (
mlEngineHost = "predictkube-dev.dysnix.org"
mlEnginePort = 8080
mlEngineHost = "api.predictkube.com"
mlEnginePort = 443

defaultStep = time.Minute * 5

grpcConf = &libs.GRPC{
Enabled: true,
UseReflection: true,
Compression: libs.Compression{
Enabled: false,
},
Conn: &libs.Connection{
Host: mlEngineHost,
Port: uint16(mlEnginePort),
ReadBufferSize: 50 << 20,
WriteBufferSize: 50 << 20,
MaxMessageSize: 50 << 20,
Insecure: false,
Timeout: time.Second * 15,
},
Keepalive: &libs.Keepalive{
Time: time.Minute * 5,
Timeout: time.Minute * 5,
EnforcementPolicy: &libs.EnforcementPolicy{
MinTime: time.Minute * 20,
PermitWithoutStream: false,
},
},
}
)

type PredictKubeScaler struct {
Expand Down Expand Up @@ -67,45 +99,29 @@ type predictKubeMetadata struct {
var predictKubeLog = logf.Log.WithName("predictkube_scaler")

func (pks *PredictKubeScaler) setupClientConn() error {
clientOpt, err := pc.SetGrpcClientOptions(&libs.GRPC{
Enabled: true,
UseReflection: true,
Compression: libs.Compression{
Enabled: false,
},
Conn: &libs.Connection{
Host: mlEngineHost,
Port: uint16(mlEnginePort),
ReadBufferSize: 50 << 20,
WriteBufferSize: 50 << 20,
MaxMessageSize: 50 << 20,
Insecure: true,
Timeout: time.Second * 15,
},
Keepalive: &libs.Keepalive{
Time: time.Minute * 5,
Timeout: time.Minute * 5,
EnforcementPolicy: &libs.EnforcementPolicy{
MinTime: time.Minute * 20,
PermitWithoutStream: false,
},
},
},
clientOpt, err := pc.SetGrpcClientOptions(grpcConf,
&libs.Base{
Monitoring: libs.Monitoring{
Enabled: false,
},
Profiling: libs.Profiling{
Enabled: false,
},
IsDebugMode: true,
Single: &libs.Single{
Enabled: false,
},
},
pc.InjectPublicClientMetadataInterceptor(pks.metadata.apiKey),
)

if !grpcConf.Conn.Insecure {
clientOpt = append(clientOpt, grpc.WithTransportCredentials(
credentials.NewTLS(&tls.Config{
ServerName: mlEngineHost,
}),
))
}

if err != nil {
return err
}
Expand All @@ -122,7 +138,7 @@ func (pks *PredictKubeScaler) setupClientConn() error {
}

// NewPredictKubeScaler creates a new PredictKube scaler
func NewPredictKubeScaler(_ context.Context, config *ScalerConfig) (*PredictKubeScaler, error) {
func NewPredictKubeScaler(ctx context.Context, config *ScalerConfig) (*PredictKubeScaler, error) {
s := &PredictKubeScaler{}

meta, err := parsePredictKubeMetadata(config)
Expand All @@ -133,7 +149,7 @@ func NewPredictKubeScaler(_ context.Context, config *ScalerConfig) (*PredictKube

s.metadata = meta

err = s.initPredictKubePrometheusConn()
err = s.initPredictKubePrometheusConn(ctx)
if err != nil {
predictKubeLog.Error(err, "error create Prometheus client and API objects")
return nil, fmt.Errorf("error create Prometheus client and API objects: %3s", err)
Expand All @@ -151,18 +167,21 @@ func NewPredictKubeScaler(_ context.Context, config *ScalerConfig) (*PredictKube
// IsActive returns true if we are able to get metrics from PredictKube
func (pks *PredictKubeScaler) IsActive(ctx context.Context) (bool, error) {
results, err := pks.doQuery(ctx)
// TODO:
if err != nil {
return false, err
}

resp, err := pks.healthClient.Check(ctx, &health.HealthCheckRequest{})

if resp == nil || err != nil {
err = fmt.Errorf("can't connect grpc server: %v, code: %v", err, status.Code(err))
if resp == nil {
return len(results) > 0, fmt.Errorf("can't connect grpc server: empty server response, code: %v", codes.Unknown)
}

return len(results) > 0 && err == nil, err
if err != nil {
return len(results) > 0, fmt.Errorf("can't connect grpc server: %v, code: %v", err, status.Code(err))
}

return len(results) > 0, nil
}

func (pks *PredictKubeScaler) Close(_ context.Context) error {
Expand Down Expand Up @@ -202,7 +221,7 @@ func (pks *PredictKubeScaler) GetMetrics(ctx context.Context, metricName string,
return nil, err
}

predictKubeLog.Info(fmt.Sprintf("predict value is: %d", value))
predictKubeLog.V(1).Info(fmt.Sprintf("predict value is: %d", value))

val := *resource.NewQuantity(value, resource.DecimalSI)

Expand Down Expand Up @@ -337,6 +356,8 @@ func (pks *PredictKubeScaler) parsePrometheusResult(result model.Value) (out []*
MetricName: metricName,
})
}
default:
return nil, errors.New(invalidMetricTypeErr)
}

return out, nil
Expand Down Expand Up @@ -410,7 +431,7 @@ func parsePredictKubeMetadata(config *ScalerConfig) (result *predictKubeMetadata

err = validate.Var(val, "ascii")
if err != nil {
return nil, fmt.Errorf("invalid apiKey")
return nil, fmt.Errorf("invalid metricName")
}

meta.metricName = val
Expand Down Expand Up @@ -440,7 +461,7 @@ func (pks *PredictKubeScaler) ping(ctx context.Context) (err error) {
}

// initPredictKubePrometheusConn init prometheus client and setup connection to API
func (pks *PredictKubeScaler) initPredictKubePrometheusConn() (err error) {
func (pks *PredictKubeScaler) initPredictKubePrometheusConn(ctx context.Context) (err error) {
pks.transport = http_transport.NewTransport(&libs.HTTPTransport{
MaxIdleConnDuration: 10,
ReadTimeout: time.Second * 15,
Expand All @@ -459,5 +480,5 @@ func (pks *PredictKubeScaler) initPredictKubePrometheusConn() (err error) {

pks.api = v1.NewAPI(pks.prometheusClient)

return pks.ping(context.Background())
return pks.ping(ctx)
}
2 changes: 2 additions & 0 deletions pkg/scalers/predictkube_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ func TestPredictKubeGetMetricSpecForScaling(t *testing.T) {
}

func TestPredictKubeGetMetrics(t *testing.T) {
grpcConf.Conn.Insecure = true

mockPredictServer, grpcServer := runMockGrpcPredictServer()
<-time.After(time.Second * 3)
defer func() {
Expand Down

0 comments on commit 1ba8a63

Please sign in to comment.