diff --git a/README.md b/README.md new file mode 100644 index 0000000..6811e5c --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +## `OTEL <--> keda` add-on + +![diagram](./diagram.png "Diagram") +https://excalidraw.com/#json=uHHt4TRtv23mlBVFJz7kw,Qdz2aeFARJb7LF6v3rSXcg + diff --git a/diagram.png b/diagram.png new file mode 100644 index 0000000..0c893c3 Binary files /dev/null and b/diagram.png differ diff --git a/helmchart/otel-add-on/templates/deployment.yaml b/helmchart/otel-add-on/templates/deployment.yaml index fc2dc66..dfbc2be 100644 --- a/helmchart/otel-add-on/templates/deployment.yaml +++ b/helmchart/otel-add-on/templates/deployment.yaml @@ -34,22 +34,35 @@ spec: {{- toYaml .Values.securityContext | nindent 12 }} image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} -# livenessProbe: -# grpc: -# port: 4318 -# service: liveness -# timeoutSeconds: 5 -# periodSeconds: 5 -# successThreshold: 1 -# failureThreshold: 6 -# readinessProbe: -# grpc: -# port: 4318 -# service: readiness -# timeoutSeconds: 1 -# periodSeconds: 5 -# successThreshold: 1 -# failureThreshold: 3 + env: + - name: OTLP_RECEIVER_PORT + value: {{ .Values.service.otlpReceiverPort | quote }} + - name: KEDA_EXTERNAL_SCALER_PORT + value: {{ .Values.service.kedaExternalScalerPort | quote }} + - name: METRIC_STORE_RETENTION_SECONDS + value: {{ .Values.settings.metricStoreRetentionSeconds | quote }} + - name: IS_ACTIVE_POLLING_INTERVAL_MS + value: {{ .Values.settings.isActivePollingIntervalMilliseconds | quote }} + - name: NO_COLOR + value: {{ .Values.settings.noColor | quote }} + - name: NO_BANNER + value: {{ .Values.settings.noBanner | quote }} + livenessProbe: + grpc: + port: {{ .Values.service.kedaExternalScalerPort }} + service: liveness + timeoutSeconds: 5 + periodSeconds: 5 + successThreshold: 1 + failureThreshold: 6 + readinessProbe: + grpc: + port: {{ .Values.service.kedaExternalScalerPort }} + service: readiness + timeoutSeconds: 1 + periodSeconds: 5 + successThreshold: 1 + failureThreshold: 3 resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.volumeMounts }} diff --git a/helmchart/otel-add-on/values.yaml b/helmchart/otel-add-on/values.yaml index dac46df..dcc36c1 100644 --- a/helmchart/otel-add-on/values.yaml +++ b/helmchart/otel-add-on/values.yaml @@ -7,6 +7,12 @@ image: pullPolicy: Always tag: "" +settings: + metricStoreRetentionSeconds: 120 + isActivePollingIntervalMilliseconds: 500 + noColor: false + noBanner: false + asciiArt: true imagePullSecrets: [] nameOverride: "" diff --git a/main.go b/main.go index ea52bd9..e2fb5f7 100644 --- a/main.go +++ b/main.go @@ -40,52 +40,26 @@ var ( setupLog = ctrl.Log.WithName("setup") ) -// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch -// +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch - func main() { - // todo: get rid of http addon dependencies cfg := scaler.MustParseConfig() otlpReceiverPort := cfg.OTLPReceiverPort - //namespace := cfg.TargetNamespace - //svcName := cfg.TargetService - //deplName := cfg.TargetDeployment - //targetPortStr := fmt.Sprintf("%d", cfg.TargetPort) - //targetPendingRequests := cfg.TargetPendingRequests - - //targetPendingRequests := cfg.TargetPendingRequests + kedaExternalScalerPort := cfg.KedaExternalScalerPort + metricStoreRetentionSeconds := cfg.MetricStoreRetentionSeconds util.SetupLog(cfg.NoColor) - util.PrintBanner(setupLog, cfg.NoColor) - - //k8sCfg, err := ctrl.GetConfig() + if !cfg.NoBanner { + util.PrintBanner(setupLog, cfg.NoColor) + } _, err := ctrl.GetConfig() if err != nil { setupLog.Error(err, "Kubernetes client config not found") os.Exit(1) } - //k8sCl, err := kubernetes.NewForConfig(k8sCfg) - //if err != nil { - // setupLog.Error(err, "creating new Kubernetes ClientSet") - // os.Exit(1) - //} - // create the endpoints informer - //endpInformer := k8s.NewInformerBackedEndpointsCache( - // ctrl.Log, - // k8sCl, - // cfg.DeploymentCacheRsyncPeriod, - //) - - //httpCl, err := clientset.NewForConfig(k8sCfg) if err != nil { setupLog.Error(err, "creating new HTTP ClientSet") os.Exit(1) } - //sharedInformerFactory := informers.NewSharedInformerFactory(httpCl, cfg.ConfigMapCacheRsyncPeriod) - //soInformer := informersv1alpha1.New(sharedInformerFactory, "", nil).ScaledObjects() - - //httpCl, err := clientset.NewForConfig(k8sCfg) if err != nil { setupLog.Error(err, "creating new HTTP ClientSet") os.Exit(1) @@ -95,34 +69,9 @@ func main() { ctx = util.ContextWithLogger(ctx, setupLog) eg, ctx := errgroup.WithContext(ctx) - ms := metric.NewMetricStore(5) + ms := metric.NewMetricStore(metricStoreRetentionSeconds) mp := metric.NewParser() - // start the endpoints informer - //eg.Go(func() error { - // setupLog.Info("starting the endpoints informer") - // endpInformer.Start(ctx) - // return nil - //}) - // - //// start the httpso informer - //eg.Go(func() error { - // setupLog.Info("starting the httpso informer") - // soInformer.Informer().Run(ctx.Done()) - // return nil - //}) - - //eg.Go(func() error { - // setupLog.Info("starting the queue pinger") - // - // //if err := pinger.start(ctx, time.NewTicker(cfg.QueueTickDuration), endpInformer); !util.IsIgnoredErr(err) { - // // setupLog.Error(err, "queue pinger failed") - // // return err - // //} - // - // return nil - //}) - eg.Go(func() error { addr := fmt.Sprintf("0.0.0.0:%d", otlpReceiverPort) setupLog.Info("starting the grpc server for OTLP receiver", "address", addr) @@ -161,8 +110,7 @@ func main() { } setupLog.Info("starting the grpc server KEDA external push...") - // todo: port cfg - if err := startGrpcServer(ctx, cfg, ctrl.Log, ms, mp, otlpReceiverPort+1); !util.IsIgnoredErr(err) { + if err := startGrpcServer(ctx, ctrl.Log, ms, mp, kedaExternalScalerPort); !util.IsIgnoredErr(err) { setupLog.Error(err, "grpc server failed") return err } @@ -182,7 +130,6 @@ func main() { func startGrpcServer( ctx context.Context, - cfg *scaler.Config, lggr logr.Logger, ms types.MemStore, mp types.Parser, @@ -213,15 +160,8 @@ func startGrpcServer( return // do our regularly scheduled work case <-ticker.C: - // if we haven't updated the endpoints in twice QueueTickDuration we drop the check - //if time.Now().After(pinger.lastPingTime.Add(cfg.QueueTickDuration * 2)) { - // hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_NOT_SERVING) - // hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_NOT_SERVING) - //} else { - // // we propagate pinger status as scaler status - hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_ServingStatus(1)) - hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_ServingStatus(1)) - //} + hs.SetServingStatus("liveness", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("readiness", grpc_health_v1.HealthCheckResponse_SERVING) } } }() @@ -237,7 +177,6 @@ func startGrpcServer( lggr, ms, mp, - //soInformer, ), ) diff --git a/scaler/config.go b/scaler/config.go index b69b559..7d44385 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -1,38 +1,15 @@ package scaler import ( - "time" - "github.com/kelseyhightower/envconfig" ) type Config struct { - OTLPReceiverPort int `envconfig:"OTLP_RECEIVER_PORT" default:"4317"` - NoColor bool `envconfig:"NO_COLOR" default:"false"` - // TargetNamespace is the namespace in which this scaler is running, and the namespace - // that the target interceptors are running in. This scaler and all the interceptors - // must be running in the same namespace - //TargetNamespace string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_NAMESPACE" required:"true"` - // TargetService is the name of the service to issue metrics RPC requests to interceptors - //TargetService string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_SERVICE" required:"true"` - // TargetDeployment is the name of the deployment to issue metrics RPC requests to interceptors - //TargetDeployment string `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_DEPLOYMENT" required:"true"` - // TargetPort is the port on TargetService to which to issue metrics RPC requests to - // interceptors - //TargetPort int `envconfig:"KEDA_HTTP_SCALER_TARGET_ADMIN_PORT" required:"true"` - // TargetPendingRequests is the default value for the - // pending requests value that the scaler will return to - // KEDA, if that value is not set on an incoming - // `HTTPScaledObject` - TargetPendingRequests int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS" default:"100"` - // ConfigMapCacheRsyncPeriod is the time interval - // for the configmap informer to rsync the local cache. - ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"` - // DeploymentCacheRsyncPeriod is the time interval - // for the deployment informer to rsync the local cache. - DeploymentCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_DEPLOYMENT_INFORMER_RSYNC_PERIOD" default:"60m"` - // QueueTickDuration is the duration between queue requests - QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"` + OTLPReceiverPort int `envconfig:"OTLP_RECEIVER_PORT" default:"4317"` + NoColor bool `envconfig:"NO_COLOR" default:"false"` + NoBanner bool `envconfig:"NO_BANNER" default:"false"` + KedaExternalScalerPort int `envconfig:"KEDA_EXTERNAL_SCALER_PORT" default:"4318"` + MetricStoreRetentionSeconds int `envconfig:"METRIC_STORE_RETENTION_SECONDS" default:"120"` } func MustParseConfig() *Config { diff --git a/scaler/handlers.go b/scaler/handlers.go index 313267f..f884a5c 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -20,15 +20,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -const ( - keyInterceptorTargetPendingRequests = "interceptorTargetPendingRequests" -) - var streamInterval time.Duration func init() { - defaultMS := 200 - timeoutMS, err := util.ResolveOsEnvInt("KEDA_HTTP_SCALER_STREAM_INTERVAL_MS", defaultMS) + defaultMS := 500 + timeoutMS, err := util.ResolveOsEnvInt("IS_ACTIVE_POLLING_INTERVAL_MS", defaultMS) if err != nil { timeoutMS = defaultMS }