From 72f25eec86a476fcd8637742f5b6c9f489feb5d2 Mon Sep 17 00:00:00 2001 From: Jirka Kremser Date: Thu, 10 Oct 2024 18:41:24 +0200 Subject: [PATCH 1/2] helper for logs Signed-off-by: Jirka Kremser --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile b/Makefile index 747ee3c..b82577c 100644 --- a/Makefile +++ b/Makefile @@ -71,6 +71,11 @@ dev-k3d: build-image ## Builds the container image for current arch, imports it helm upgrade --reuse-values --set image.tag=latest keda-otel helmchart/otel-add-on --set image.pullPolicy=IfNotPresent kubectl rollout restart deploy/otel-add-on-scaler +.PHONY: logs +logs: + @$(call say,logs) + kubectl logs -lapp.kubernetes.io/name=otel-add-on --tail=-1 --follow + CONTROLLER_GEN = ${HACK_BIN}/controller-gen controller-gen: ## Download controller-gen locally if necessary. GOBIN=$(shell pwd)/bin go install sigs.k8s.io/controller-tools/cmd/controller-gen@v0.15.0 From 5a696a2fd0545a9b72946e8f13f883ce88a82ea6 Mon Sep 17 00:00:00 2001 From: Jirka Kremser Date: Fri, 11 Oct 2024 12:21:45 +0200 Subject: [PATCH 2/2] <3 4 logs Signed-off-by: Jirka Kremser --- Makefile | 5 +- build/version.go | 10 +++- .../otel-add-on/templates/deployment.yaml | 7 ++- helmchart/otel-add-on/values.yaml | 19 ++++++- main.go | 39 +++++++------- metric/mem_store.go | 4 -- receiver/receiver.go | 53 ++++++++----------- scaler/handlers.go | 28 +++------- util/helpers.go | 8 +++ util/welcome.go | 6 +-- 10 files changed, 96 insertions(+), 83 deletions(-) diff --git a/Makefile b/Makefile index b82577c..399ada9 100644 --- a/Makefile +++ b/Makefile @@ -68,7 +68,10 @@ deploy-helm: ## Deploys helm chart with otel-collector and otel scaler. dev-k3d: build-image ## Builds the container image for current arch, imports it to running k3d and restarts the scaler. @$(call say,Doing the dev cycle) k3d image import ghcr.io/kedify/otel-add-on:latest - helm upgrade --reuse-values --set image.tag=latest keda-otel helmchart/otel-add-on --set image.pullPolicy=IfNotPresent + helm upgrade --reuse-values \ + --set image.tag=latest keda-otel helmchart/otel-add-on \ + --set image.pullPolicy=IfNotPresent \ + --set settings.logs.logLvl=debug kubectl rollout restart deploy/otel-add-on-scaler .PHONY: logs diff --git a/build/version.go b/build/version.go index 0ca153d..2bf32b7 100644 --- a/build/version.go +++ b/build/version.go @@ -5,6 +5,9 @@ import ( "runtime" "github.com/go-logr/logr" + "go.uber.org/zap/zapcore" + + "github.com/kedify/otel-add-on/util" ) var ( @@ -22,9 +25,14 @@ func GitCommit() string { return gitCommit } -func PrintComponentInfo(logger logr.Logger, component string) { +func PrintComponentInfo(logger logr.Logger, lvl zapcore.LevelEnabler, component string) { logger.Info(fmt.Sprintf("%s Version: %s", component, Version())) logger.Info(fmt.Sprintf("%s Commit: %s", component, GitCommit())) logger.Info(fmt.Sprintf("Go Version: %s", runtime.Version())) logger.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH)) + if lvl != nil { + logger.Info(fmt.Sprintf("Logger: %+v", lvl)) + logger.Info(fmt.Sprintf("Debug enabled: %+v", lvl.Enabled(util.DebugLvl))) + } + fmt.Println() } diff --git a/helmchart/otel-add-on/templates/deployment.yaml b/helmchart/otel-add-on/templates/deployment.yaml index f9237dd..9b192e5 100644 --- a/helmchart/otel-add-on/templates/deployment.yaml +++ b/helmchart/otel-add-on/templates/deployment.yaml @@ -45,12 +45,15 @@ spec: value: {{ .Values.settings.isActivePollingIntervalMilliseconds | quote }} {{- if .Values.settings.noColor }} - name: NO_COLOR - value: {{ .Values.settings.noColor | quote }} + value: {{ .Values.settings.logs.noColor | quote }} {{- end }} {{- if .Values.settings.noBanner }} - name: NO_BANNER - value: {{ .Values.settings.noBanner | quote }} + value: {{ .Values.settings.logs.noBanner | quote }} {{- end }} + args: + - --zap-log-level={{ .Values.settings.logs.logLvl }} + - --zap-stacktrace-level={{ .Values.settings.logs.stackTracesLvl }} livenessProbe: grpc: port: {{ .Values.service.kedaExternalScalerPort }} diff --git a/helmchart/otel-add-on/values.yaml b/helmchart/otel-add-on/values.yaml index dcc36c1..8becd22 100644 --- a/helmchart/otel-add-on/values.yaml +++ b/helmchart/otel-add-on/values.yaml @@ -8,10 +8,25 @@ image: tag: "" settings: + # how long the metrics should be kept in the short term (in memory) storage metricStoreRetentionSeconds: 120 + + # how often (in milliseconds) should the IsActive method be tried isActivePollingIntervalMilliseconds: 500 - noColor: false - noBanner: false + + logs: + # Can be one of 'debug', 'info', 'error', or any integer value > 0 + # which corresponds to custom debug levels of increasing verbosity + logLvl: info + + # one of: info, error, panic + stackTracesLvl: error + + # if anything else than 'false', the log will not contain colors + noColor: false + + # if anything else than 'false', the log will not print the ascii logo + noBanner: false asciiArt: true imagePullSecrets: [] diff --git a/main.go b/main.go index e2fb5f7..801f407 100644 --- a/main.go +++ b/main.go @@ -46,9 +46,10 @@ func main() { kedaExternalScalerPort := cfg.KedaExternalScalerPort metricStoreRetentionSeconds := cfg.MetricStoreRetentionSeconds - util.SetupLog(cfg.NoColor) + lvl := util.SetupLog(cfg.NoColor) + if !cfg.NoBanner { - util.PrintBanner(setupLog, cfg.NoColor) + util.PrintBanner(cfg.NoColor) } _, err := ctrl.GetConfig() if err != nil { @@ -94,31 +95,36 @@ func main() { }, }, } + settings := &rec.Settings{ - ID: component.MustNewIDWithName("bar", "foo"), + ID: component.MustNewIDWithName("id", "otlp-receiver"), BuildInfo: component.NewDefaultBuildInfo(), TelemetrySettings: componenttest.NewNopTelemetrySettings(), } - r, err := receiver.NewOtlpReceiver(conf, settings, ms) - + r, e := receiver.NewOtlpReceiver(conf, settings, ms, util.IsDebug(lvl)) + if e != nil { + setupLog.Error(e, "failed to create new OTLP receiver") + return e + } r.RegisterMetricsConsumer(mc{}) - r.Start(ctx, componenttest.NewNopHost()) - if err != nil { - setupLog.Error(err, "otlp receiver failed to create") - return err + e = r.Start(ctx, componenttest.NewNopHost()) + if e != nil { + setupLog.Error(e, "OTLP receiver failed to start") + return e } - setupLog.Info("starting the grpc server KEDA external push...") - if err := startGrpcServer(ctx, ctrl.Log, ms, mp, kedaExternalScalerPort); !util.IsIgnoredErr(err) { - setupLog.Error(err, "grpc server failed") - return err + kedaExternalScalerAddr := fmt.Sprintf("0.0.0.0:%d", kedaExternalScalerPort) + setupLog.Info("starting the grpc server for KEDA scaler", "address", kedaExternalScalerAddr) + if e = startGrpcServer(ctx, ctrl.Log, ms, mp, kedaExternalScalerAddr); !util.IsIgnoredErr(err) { + setupLog.Error(e, "grpc server failed") + return e } return nil }) - build.PrintComponentInfo(ctrl.Log, "OTEL addon for KEDA") + build.PrintComponentInfo(ctrl.Log, lvl, "OTEL addon for KEDA") if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { setupLog.Error(err, "fatal error") @@ -133,11 +139,8 @@ func startGrpcServer( lggr logr.Logger, ms types.MemStore, mp types.Parser, - port int, + addr string, ) error { - addr := fmt.Sprintf("0.0.0.0:%d", port) - lggr.Info("starting grpc server", "address", addr) - lis, err := net.Listen("tcp", addr) if err != nil { return err diff --git a/metric/mem_store.go b/metric/mem_store.go index 7732256..40f1180 100644 --- a/metric/mem_store.go +++ b/metric/mem_store.go @@ -42,7 +42,6 @@ func (m ms) Get(unescapedName types.MetricName, searchLabels types.Labels, timeO } return ret, true, nil } - fmt.Sprintf(" ---- Get: %v -> %v", name, md.AggregatesOverTime[timeOp]) return md.AggregatesOverTime[timeOp], true, nil } // multiple metric vectors match the search criteria @@ -67,7 +66,6 @@ func (m ms) Get(unescapedName types.MetricName, searchLabels types.Labels, timeO } } } - fmt.Sprintf(" ---- Get: %v -> %v", name, accumulator) return accumulator, true, nil } @@ -82,7 +80,6 @@ func checkDefaultAggregation(aggregation types.AggregationOverVectors) error { func (m ms) Put(entry types.NewMetricEntry) { name := escapeName(entry.Name) - fmt.Sprintf(" ---- Put: %v -> %v", name, entry.Value) if _, found := m.store[name]; !found { m.store[name] = make(map[types.LabelsHash]types.MetricData) } @@ -97,7 +94,6 @@ func (m ms) Put(entry types.NewMetricEntry) { notStale := util.Filter(md.Data, func(val types.ObservedValue) bool { return !m.isStale(val.Time, now) }) - fmt.Sprintf("not stale: %v", notStale) md.Data = append(notStale, types.ObservedValue{ Time: entry.Time, Value: entry.Value, diff --git a/receiver/receiver.go b/receiver/receiver.go index 61021b9..f4eb44e 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -39,17 +39,19 @@ type otlpReceiver struct { settings *receiver.Settings metricMemStore types.MemStore + debug bool } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func NewOtlpReceiver(cfg *otlpreceiver.Config, set *receiver.Settings, memStore types.MemStore) (*otlpReceiver, error) { +func NewOtlpReceiver(cfg *otlpreceiver.Config, set *receiver.Settings, memStore types.MemStore, debug bool) (*otlpReceiver, error) { r := &otlpReceiver{ cfg: cfg, nextMetrics: nil, settings: set, metricMemStore: memStore, + debug: debug, } var err error @@ -77,7 +79,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error { } if r.nextMetrics != nil { - pmetricotlp.RegisterGRPCServer(r.serverGRPC, New(r.nextMetrics, r.obsrepGRPC, r.metricMemStore)) + pmetricotlp.RegisterGRPCServer(r.serverGRPC, New(r.nextMetrics, r.obsrepGRPC, r.metricMemStore, r.debug)) } r.settings.Logger.Info("Starting GRPC server", zap.String("endpoint", r.cfg.GRPC.NetAddr.Endpoint)) @@ -99,7 +101,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error { // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. -func (r *otlpReceiver) Start(ctx context.Context, host component.Host) error { +func (r *otlpReceiver) Start(_ context.Context, host component.Host) error { if err := r.startGRPCServer(host); err != nil { return err } @@ -108,7 +110,7 @@ func (r *otlpReceiver) Start(ctx context.Context, host component.Host) error { } // Shutdown is a method to turn off receiving. -func (r *otlpReceiver) Shutdown(ctx context.Context) error { +func (r *otlpReceiver) Shutdown(_ context.Context) error { var err error if r.serverGRPC != nil { @@ -131,14 +133,16 @@ type Receiver struct { nextConsumer consumer.Metrics obsreport *receiverhelper.ObsReport metricMemStore types.MemStore + debug bool } // New creates a new Receiver reference. -func New(nextConsumer consumer.Metrics, obsreport *receiverhelper.ObsReport, memStore types.MemStore) *Receiver { +func New(nextConsumer consumer.Metrics, obsreport *receiverhelper.ObsReport, memStore types.MemStore, debug bool) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsreport: obsreport, metricMemStore: memStore, + debug: debug, } } @@ -149,8 +153,8 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p if dataPointCount == 0 { return pmetricotlp.NewExportResponse(), nil } - fmt.Printf("\n\nData point count: %d\n", dataPointCount) - //fmt.Printf("\n\nData points: %v\n", md.ResourceMetrics()) + // using the printf instead of logger makes the metric data nicer in logs + r.p("\n\nData point count: %d\n", dataPointCount) resLen := md.ResourceMetrics().Len() for i := 0; i < resLen; i++ { sm := md.ResourceMetrics().At(i).ScopeMetrics() @@ -159,8 +163,8 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p mLen := sm.At(j).Metrics().Len() metrics := sm.At(j).Metrics() for k := 0; k < mLen; k++ { - fmt.Printf("- name: %+v\n", metrics.At(k).Name()) - fmt.Printf(" type: %+v\n", metrics.At(k).Type()) + r.p("- name: %+v\n", metrics.At(k).Name()) + r.p(" type: %+v\n", metrics.At(k).Type()) var dataPoints pmetric.NumberDataPointSlice switch metrics.At(k).Type() { case pmetric.MetricTypeGauge: @@ -172,10 +176,10 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p } for l := 0; l < dataPoints.Len(); l++ { datapoint := dataPoints.At(l) - fmt.Printf(" - time: %+v\n", datapoint.Timestamp()) - fmt.Printf(" tags: %+v\n", datapoint.Attributes().AsRaw()) + r.p(" - time: %+v\n", datapoint.Timestamp()) + r.p(" tags: %+v\n", datapoint.Attributes().AsRaw()) value := math.Max(datapoint.DoubleValue(), float64(datapoint.IntValue())) - fmt.Printf(" value: %+v\n", value) + r.p(" value: %+v\n", value) r.metricMemStore.Put(types.NewMetricEntry{ Name: types.MetricName(metrics.At(k).Name()), ObservedValue: types.ObservedValue{ @@ -187,31 +191,12 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p } } } - //for k, v := range m { - // fmt.Printf("k=: %+v\n", k) - // fmt.Printf("v=: %+v\n", v) - //} - //for j := 0; j < md.ResourceMetrics().At(i).ScopeMetrics().Len(); i++ { - // md.ResourceMetrics(). - // fmt.Printf("222internal: %+v\n", md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics()) - // //m := md.ResourceMetrics().At(i).Resource().Attributes().AsRaw() - // //for k, v := range m { - // // fmt.Printf("k=: %+v\n", k) - // // fmt.Printf("v=: %+v\n", v) - // //} - //} } ctx = r.obsreport.StartMetricsOp(ctx) err := r.nextConsumer.ConsumeMetrics(ctx, md) r.obsreport.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) - // Use appropriate status codes for permanent/non-permanent errors - // If we return the error straightaway, then the grpc implementation will set status code to Unknown - // Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345 - // So, convert the error to appropriate grpc status and return the error - // NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503) - // Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400) if err != nil { return pmetricotlp.NewExportResponse(), GetStatusFromError(err) } @@ -219,6 +204,12 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p return pmetricotlp.NewExportResponse(), nil } +func (r *Receiver) p(format string, a ...any) { + if r.debug { + fmt.Printf(format, a...) + } +} + func GetStatusFromError(err error) error { s, ok := status.FromError(err) if !ok { diff --git a/scaler/handlers.go b/scaler/handlers.go index f884a5c..a22582d 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "math" - "strconv" "time" "github.com/go-logr/logr" @@ -152,26 +151,6 @@ func (e *impl) GetMetricSpec( return res, nil } -func (e *impl) interceptorMetricSpec(metricName string, interceptorTargetPendingRequests string) (*externalscaler.GetMetricSpecResponse, error) { - lggr := e.lggr.WithName("interceptorMetricSpec") - - targetPendingRequests, err := strconv.ParseInt(interceptorTargetPendingRequests, 10, 64) - if err != nil { - lggr.Error(err, "unable to parse interceptorTargetPendingRequests", "value", interceptorTargetPendingRequests) - return nil, err - } - - res := &externalscaler.GetMetricSpecResponse{ - MetricSpecs: []*externalscaler.MetricSpec{ - { - MetricName: metricName, - TargetSize: targetPendingRequests, - }, - }, - } - return res, nil -} - func (e *impl) GetMetrics( ctx context.Context, metricRequest *externalscaler.GetMetricsRequest, @@ -193,6 +172,12 @@ func (e *impl) GetMetrics( value, found, err := e.metricStore.Get(metricName, labels, opOverTime, agg) //lggr.V(1).Info("got metric value: ", "value", value, "found", found, "error", err) lggr.Info("got metric value: ", "name", metricName, "labels", labels, "value", value, "found", found, "error", err) + if !found { + return nil, fmt.Errorf("not found") + } + if err != nil { + return nil, err + } value = util.ClampValue(lggr, value, scalerMetadata) res := &externalscaler.GetMetricsResponse{ @@ -203,6 +188,7 @@ func (e *impl) GetMetrics( }, }, } + //fmt.Printf("GetMetrics: %v", res) //fmt.Printf("GetMetrics: name: %v target: %v", string(metricName), targetValue) return res, nil diff --git a/util/helpers.go b/util/helpers.go index d2b79da..6a076cf 100644 --- a/util/helpers.go +++ b/util/helpers.go @@ -6,9 +6,13 @@ import ( "strconv" "time" + "go.uber.org/zap/zapcore" + "github.com/kedify/otel-add-on/types" ) +const DebugLvl = -1 + func Map[I, R any](input []I, f func(I) R) []R { result := make([]R, len(input)) for i := range input { @@ -84,3 +88,7 @@ func CheckTimeOp(op types.OperationOverTime) error { return fmt.Errorf("unknown OperationOverTime:%s", op) } } + +func IsDebug(lvl zapcore.LevelEnabler) bool { + return lvl != nil && lvl.Enabled(DebugLvl) +} diff --git a/util/welcome.go b/util/welcome.go index 61284e7..0ba0a0f 100644 --- a/util/welcome.go +++ b/util/welcome.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/fatih/color" - "github.com/go-logr/logr" "go.uber.org/zap/zapcore" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -15,7 +14,7 @@ const ( fiveSpaces = " " ) -func PrintBanner(logger logr.Logger, noColor bool) { +func PrintBanner(noColor bool) { color.NoColor = noColor c1, c2, c3, c4 := color.FgBlue, color.FgWhite, color.FgCyan, color.FgHiYellow pad := fiveSpaces + fiveSpaces @@ -33,7 +32,7 @@ func PrintBanner(logger logr.Logger, noColor bool) { } // SetupLog tweak the default log to use custom time format and use colors if supported -func SetupLog(noColor bool) { +func SetupLog(noColor bool) zapcore.LevelEnabler { var opts zap.Options zap.UseDevMode(true)(&opts) zap.ConsoleEncoder(func(c *zapcore.EncoderConfig) { @@ -49,4 +48,5 @@ func SetupLog(noColor bool) { opts.BindFlags(flag.CommandLine) flag.Parse() ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + return opts.Level }