Skip to content

Commit

Permalink
readme, diagram, env vars
Browse files Browse the repository at this point in the history
Signed-off-by: Jirka Kremser <jiri.kremser@gmail.com>
  • Loading branch information
jkremser committed Oct 10, 2024
1 parent c64b04f commit ec32c62
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 120 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## `OTEL <--> keda` add-on

![diagram](./diagram.png "Diagram")
https://excalidraw.com/#json=uHHt4TRtv23mlBVFJz7kw,Qdz2aeFARJb7LF6v3rSXcg

Binary file added diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
45 changes: 29 additions & 16 deletions helmchart/otel-add-on/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,35 @@ spec:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
# livenessProbe:
# grpc:
# port: 4318
# service: liveness
# timeoutSeconds: 5
# periodSeconds: 5
# successThreshold: 1
# failureThreshold: 6
# readinessProbe:
# grpc:
# port: 4318
# service: readiness
# timeoutSeconds: 1
# periodSeconds: 5
# successThreshold: 1
# failureThreshold: 3
env:
- name: OTLP_RECEIVER_PORT
value: {{ .Values.service.otlpReceiverPort | quote }}
- name: KEDA_EXTERNAL_SCALER_PORT
value: {{ .Values.service.kedaExternalScalerPort | quote }}
- name: METRIC_STORE_RETENTION_SECONDS
value: {{ .Values.settings.metricStoreRetentionSeconds | quote }}
- name: IS_ACTIVE_POLLING_INTERVAL_MS
value: {{ .Values.settings.isActivePollingIntervalMilliseconds | quote }}
- name: NO_COLOR
value: {{ .Values.settings.noColor | quote }}
- name: NO_BANNER
value: {{ .Values.settings.noBanner | quote }}
livenessProbe:
grpc:
port: {{ .Values.service.kedaExternalScalerPort }}
service: liveness
timeoutSeconds: 5
periodSeconds: 5
successThreshold: 1
failureThreshold: 6
readinessProbe:
grpc:
port: {{ .Values.service.kedaExternalScalerPort }}
service: readiness
timeoutSeconds: 1
periodSeconds: 5
successThreshold: 1
failureThreshold: 3
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.volumeMounts }}
Expand Down
6 changes: 6 additions & 0 deletions helmchart/otel-add-on/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ image:
pullPolicy: Always
tag: ""

settings:
metricStoreRetentionSeconds: 120
isActivePollingIntervalMilliseconds: 500
noColor: false
noBanner: false

asciiArt: true
imagePullSecrets: []
nameOverride: ""
Expand Down
79 changes: 9 additions & 70 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,52 +40,26 @@ var (
setupLog = ctrl.Log.WithName("setup")
)

// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch

func main() {
// todo: get rid of http addon dependencies
cfg := scaler.MustParseConfig()
otlpReceiverPort := cfg.OTLPReceiverPort
//namespace := cfg.TargetNamespace
//svcName := cfg.TargetService
//deplName := cfg.TargetDeployment
//targetPortStr := fmt.Sprintf("%d", cfg.TargetPort)
//targetPendingRequests := cfg.TargetPendingRequests

//targetPendingRequests := cfg.TargetPendingRequests
kedaExternalScalerPort := cfg.KedaExternalScalerPort
metricStoreRetentionSeconds := cfg.MetricStoreRetentionSeconds

util.SetupLog(cfg.NoColor)
util.PrintBanner(setupLog, cfg.NoColor)

//k8sCfg, err := ctrl.GetConfig()
if !cfg.NoBanner {
util.PrintBanner(setupLog, cfg.NoColor)
}
_, err := ctrl.GetConfig()
if err != nil {
setupLog.Error(err, "Kubernetes client config not found")
os.Exit(1)
}
//k8sCl, err := kubernetes.NewForConfig(k8sCfg)
//if err != nil {
// setupLog.Error(err, "creating new Kubernetes ClientSet")
// os.Exit(1)
//}

// create the endpoints informer
//endpInformer := k8s.NewInformerBackedEndpointsCache(
// ctrl.Log,
// k8sCl,
// cfg.DeploymentCacheRsyncPeriod,
//)

//httpCl, err := clientset.NewForConfig(k8sCfg)
if err != nil {
setupLog.Error(err, "creating new HTTP ClientSet")
os.Exit(1)
}
//sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, cfg.ConfigMapCacheRsyncPeriod)
//soInformer := informersv1alpha1.New(sharedInformerFactory, "", nil).ScaledObjects()

//httpCl, err := clientset.NewForConfig(k8sCfg)
if err != nil {
setupLog.Error(err, "creating new HTTP ClientSet")
os.Exit(1)
Expand All @@ -95,34 +69,9 @@ func main() {
ctx = util.ContextWithLogger(ctx, setupLog)

eg, ctx := errgroup.WithContext(ctx)
ms := metric.NewMetricStore(5)
ms := metric.NewMetricStore(metricStoreRetentionSeconds)
mp := metric.NewParser()

// start the endpoints informer
//eg.Go(func() error {
// setupLog.Info("starting the endpoints informer")
// endpInformer.Start(ctx)
// return nil
//})
//
//// start the httpso informer
//eg.Go(func() error {
// setupLog.Info("starting the httpso informer")
// soInformer.Informer().Run(ctx.Done())
// return nil
//})

//eg.Go(func() error {
// setupLog.Info("starting the queue pinger")
//
// //if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), endpInformer); !util.IsIgnoredErr(err) {
// // setupLog.Error(err, "queue pinger failed")
// // return err
// //}
//
// return nil
//})

eg.Go(func() error {
addr := fmt.Sprintf("0.0.0.0:%d", otlpReceiverPort)
setupLog.Info("starting the grpc server for OTLP receiver", "address", addr)
Expand Down Expand Up @@ -161,8 +110,7 @@ func main() {
}

setupLog.Info("starting the grpc server KEDA external push...")
// todo: port cfg
if err := startGrpcServer(ctx, cfg, ctrl.Log, ms, mp, otlpReceiverPort+1); !util.IsIgnoredErr(err) {
if err := startGrpcServer(ctx, ctrl.Log, ms, mp, kedaExternalScalerPort); !util.IsIgnoredErr(err) {
setupLog.Error(err, "grpc server failed")
return err
}
Expand All @@ -182,7 +130,6 @@ func main() {

func startGrpcServer(
ctx context.Context,
cfg *scaler.Config,
lggr logr.Logger,
ms types.MemStore,
mp types.Parser,
Expand Down Expand Up @@ -213,15 +160,8 @@ func startGrpcServer(
return
// do our regularly scheduled work
case <-ticker.C:
// if we haven't updated the endpoints in twice QueueTickDuration we drop the check
//if time.Now().After(pinger.lastPingTime.Add(cfg.QueueTickDuration * 2)) {
// hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
// hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
//} else {
// // we propagate pinger status as scaler status
hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_ServingStatus(1))
hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_ServingStatus(1))
//}
hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_SERVING)
}
}
}()
Expand All @@ -237,7 +177,6 @@ func startGrpcServer(
lggr,
ms,
mp,
//soInformer,
),
)

Expand Down
33 changes: 5 additions & 28 deletions scaler/config.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,15 @@
package scaler

import (
"time"

"github.com/kelseyhightower/envconfig"
)

type Config struct {
OTLPReceiverPort int `envconfig:"OTLP_RECEIVER_PORT" default:"4317"`
NoColor bool `envconfig:"NO_COLOR" default:"false"`
// TargetNamespace is the namespace in which this scaler is running, and the namespace
// that the target interceptors are running in. This scaler and all the interceptors
// must be running in the same namespace
//TargetNamespace string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_NAMESPACE" required:"true"`
// TargetService is the name of the service to issue metrics RPC requests to interceptors
//TargetService string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_SERVICE" required:"true"`
// TargetDeployment is the name of the deployment to issue metrics RPC requests to interceptors
//TargetDeployment string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_DEPLOYMENT" required:"true"`
// TargetPort is the port on TargetService to which to issue metrics RPC requests to
// interceptors
//TargetPort int `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_PORT" required:"true"`
// TargetPendingRequests is the default value for the
// pending requests value that the scaler will return to
// KEDA, if that value is not set on an incoming
// `HTTPScaledObject`
TargetPendingRequests int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS" default:"100"`
// ConfigMapCacheRsyncPeriod is the time interval
// for the configmap informer to rsync the local cache.
ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"`
// DeploymentCacheRsyncPeriod is the time interval
// for the deployment informer to rsync the local cache.
DeploymentCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_DEPLOYMENT_INFORMER_RSYNC_PERIOD" default:"60m"`
// QueueTickDuration is the duration between queue requests
QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"`
OTLPReceiverPort int `envconfig:"OTLP_RECEIVER_PORT" default:"4317"`
NoColor bool `envconfig:"NO_COLOR" default:"false"`
NoBanner bool `envconfig:"NO_BANNER" default:"false"`
KedaExternalScalerPort int `envconfig:"KEDA_EXTERNAL_SCALER_PORT" default:"4318"`
MetricStoreRetentionSeconds int `envconfig:"METRIC_STORE_RETENTION_SECONDS" default:"120"`
}

func MustParseConfig() *Config {
Expand Down
8 changes: 2 additions & 6 deletions scaler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

const (
keyInterceptorTargetPendingRequests = "interceptorTargetPendingRequests"
)

var streamInterval time.Duration

func init() {
defaultMS := 200
timeoutMS, err := util.ResolveOsEnvInt("KEDA_HTTP_SCALER_STREAM_INTERVAL_MS", defaultMS)
defaultMS := 500
timeoutMS, err := util.ResolveOsEnvInt("IS_ACTIVE_POLLING_INTERVAL_MS", defaultMS)
if err != nil {
timeoutMS = defaultMS
}
Expand Down

0 comments on commit ec32c62

Please sign in to comment.