Skip to content

Commit

Permalink
Merge pull request #2 from jkremser/test-pr-2
Browse files Browse the repository at this point in the history
logs in general
  • Loading branch information
jkremser authored Oct 11, 2024
2 parents e822300 + 5a696a2 commit c4194f3
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 83 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,17 @@ deploy-helm: ## Deploys helm chart with otel-collector and otel scaler.
dev-k3d: build-image ## Builds the container image for current arch, imports it to running k3d and restarts the scaler.
@$(call say,Doing the dev cycle)
k3d image import ghcr.io/kedify/otel-add-on:latest
helm upgrade --reuse-values --set image.tag=latest keda-otel helmchart/otel-add-on --set image.pullPolicy=IfNotPresent
helm upgrade --reuse-values \
--set image.tag=latest keda-otel helmchart/otel-add-on \
--set image.pullPolicy=IfNotPresent \
--set settings.logs.logLvl=debug
kubectl rollout restart deploy/otel-add-on-scaler

.PHONY: logs
logs:
@$(call say,logs)
kubectl logs -lapp.kubernetes.io/name=otel-add-on --tail=-1 --follow

CONTROLLER_GEN = ${HACK_BIN}/controller-gen
controller-gen: ## Download controller-gen locally if necessary.
GOBIN=$(shell pwd)/bin go install sigs.k8s.io/controller-tools/cmd/controller-gen@v0.15.0
Expand Down
10 changes: 9 additions & 1 deletion build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"runtime"

"github.com/go-logr/logr"
"go.uber.org/zap/zapcore"

"github.com/kedify/otel-add-on/util"
)

var (
Expand All @@ -22,9 +25,14 @@ func GitCommit() string {
return gitCommit
}

func PrintComponentInfo(logger logr.Logger, component string) {
func PrintComponentInfo(logger logr.Logger, lvl zapcore.LevelEnabler, component string) {
logger.Info(fmt.Sprintf("%s Version: %s", component, Version()))
logger.Info(fmt.Sprintf("%s Commit: %s", component, GitCommit()))
logger.Info(fmt.Sprintf("Go Version: %s", runtime.Version()))
logger.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH))
if lvl != nil {
logger.Info(fmt.Sprintf("Logger: %+v", lvl))
logger.Info(fmt.Sprintf("Debug enabled: %+v", lvl.Enabled(util.DebugLvl)))
}
fmt.Println()
}
7 changes: 5 additions & 2 deletions helmchart/otel-add-on/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@ spec:
value: {{ .Values.settings.isActivePollingIntervalMilliseconds | quote }}
{{- if .Values.settings.noColor }}
- name: NO_COLOR
value: {{ .Values.settings.noColor | quote }}
value: {{ .Values.settings.logs.noColor | quote }}
{{- end }}
{{- if .Values.settings.noBanner }}
- name: NO_BANNER
value: {{ .Values.settings.noBanner | quote }}
value: {{ .Values.settings.logs.noBanner | quote }}
{{- end }}
args:
- --zap-log-level={{ .Values.settings.logs.logLvl }}
- --zap-stacktrace-level={{ .Values.settings.logs.stackTracesLvl }}
livenessProbe:
grpc:
port: {{ .Values.service.kedaExternalScalerPort }}
Expand Down
19 changes: 17 additions & 2 deletions helmchart/otel-add-on/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@ image:
tag: ""

settings:
# how long the metrics should be kept in the short term (in memory) storage
metricStoreRetentionSeconds: 120

# how often (in milliseconds) should the IsActive method be tried
isActivePollingIntervalMilliseconds: 500
noColor: false
noBanner: false

logs:
# Can be one of 'debug', 'info', 'error', or any integer value > 0
# which corresponds to custom debug levels of increasing verbosity
logLvl: info

# one of: info, error, panic
stackTracesLvl: error

# if anything else than 'false', the log will not contain colors
noColor: false

# if anything else than 'false', the log will not print the ascii logo
noBanner: false

asciiArt: true
imagePullSecrets: []
Expand Down
39 changes: 21 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ func main() {
kedaExternalScalerPort := cfg.KedaExternalScalerPort
metricStoreRetentionSeconds := cfg.MetricStoreRetentionSeconds

util.SetupLog(cfg.NoColor)
lvl := util.SetupLog(cfg.NoColor)

if !cfg.NoBanner {
util.PrintBanner(setupLog, cfg.NoColor)
util.PrintBanner(cfg.NoColor)
}
_, err := ctrl.GetConfig()
if err != nil {
Expand Down Expand Up @@ -94,31 +95,36 @@ func main() {
},
},
}

settings := &rec.Settings{
ID: component.MustNewIDWithName("bar", "foo"),
ID: component.MustNewIDWithName("id", "otlp-receiver"),
BuildInfo: component.NewDefaultBuildInfo(),
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
}
r, err := receiver.NewOtlpReceiver(conf, settings, ms)

r, e := receiver.NewOtlpReceiver(conf, settings, ms, util.IsDebug(lvl))
if e != nil {
setupLog.Error(e, "failed to create new OTLP receiver")
return e
}
r.RegisterMetricsConsumer(mc{})

r.Start(ctx, componenttest.NewNopHost())
if err != nil {
setupLog.Error(err, "otlp receiver failed to create")
return err
e = r.Start(ctx, componenttest.NewNopHost())
if e != nil {
setupLog.Error(e, "OTLP receiver failed to start")
return e
}

setupLog.Info("starting the grpc server KEDA external push...")
if err := startGrpcServer(ctx, ctrl.Log, ms, mp, kedaExternalScalerPort); !util.IsIgnoredErr(err) {
setupLog.Error(err, "grpc server failed")
return err
kedaExternalScalerAddr := fmt.Sprintf("0.0.0.0:%d", kedaExternalScalerPort)
setupLog.Info("starting the grpc server for KEDA scaler", "address", kedaExternalScalerAddr)
if e = startGrpcServer(ctx, ctrl.Log, ms, mp, kedaExternalScalerAddr); !util.IsIgnoredErr(err) {
setupLog.Error(e, "grpc server failed")
return e
}

return nil
})

build.PrintComponentInfo(ctrl.Log, "OTEL addon for KEDA")
build.PrintComponentInfo(ctrl.Log, lvl, "OTEL addon for KEDA")

if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
setupLog.Error(err, "fatal error")
Expand All @@ -133,11 +139,8 @@ func startGrpcServer(
lggr logr.Logger,
ms types.MemStore,
mp types.Parser,
port int,
addr string,
) error {
addr := fmt.Sprintf("0.0.0.0:%d", port)
lggr.Info("starting grpc server", "address", addr)

lis, err := net.Listen("tcp", addr)
if err != nil {
return err
Expand Down
4 changes: 0 additions & 4 deletions metric/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func (m ms) Get(unescapedName types.MetricName, searchLabels types.Labels, timeO
}
return ret, true, nil
}
fmt.Sprintf(" ---- Get: %v -> %v", name, md.AggregatesOverTime[timeOp])
return md.AggregatesOverTime[timeOp], true, nil
}
// multiple metric vectors match the search criteria
Expand All @@ -67,7 +66,6 @@ func (m ms) Get(unescapedName types.MetricName, searchLabels types.Labels, timeO
}
}
}
fmt.Sprintf(" ---- Get: %v -> %v", name, accumulator)
return accumulator, true, nil
}

Expand All @@ -82,7 +80,6 @@ func checkDefaultAggregation(aggregation types.AggregationOverVectors) error {

func (m ms) Put(entry types.NewMetricEntry) {
name := escapeName(entry.Name)
fmt.Sprintf(" ---- Put: %v -> %v", name, entry.Value)
if _, found := m.store[name]; !found {
m.store[name] = make(map[types.LabelsHash]types.MetricData)
}
Expand All @@ -97,7 +94,6 @@ func (m ms) Put(entry types.NewMetricEntry) {
notStale := util.Filter(md.Data, func(val types.ObservedValue) bool {
return !m.isStale(val.Time, now)
})
fmt.Sprintf("not stale: %v", notStale)
md.Data = append(notStale, types.ObservedValue{
Time: entry.Time,
Value: entry.Value,
Expand Down
53 changes: 22 additions & 31 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ type otlpReceiver struct {

settings *receiver.Settings
metricMemStore types.MemStore
debug bool
}

// newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's
// responsibility to invoke the respective Start*Reception methods as well
// as the various Stop*Reception methods to end it.
func NewOtlpReceiver(cfg *otlpreceiver.Config, set *receiver.Settings, memStore types.MemStore) (*otlpReceiver, error) {
func NewOtlpReceiver(cfg *otlpreceiver.Config, set *receiver.Settings, memStore types.MemStore, debug bool) (*otlpReceiver, error) {
r := &otlpReceiver{
cfg: cfg,
nextMetrics: nil,
settings: set,
metricMemStore: memStore,
debug: debug,
}

var err error
Expand Down Expand Up @@ -77,7 +79,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error {
}

if r.nextMetrics != nil {
pmetricotlp.RegisterGRPCServer(r.serverGRPC, New(r.nextMetrics, r.obsrepGRPC, r.metricMemStore))
pmetricotlp.RegisterGRPCServer(r.serverGRPC, New(r.nextMetrics, r.obsrepGRPC, r.metricMemStore, r.debug))
}

r.settings.Logger.Info("Starting GRPC server", zap.String("endpoint", r.cfg.GRPC.NetAddr.Endpoint))
Expand All @@ -99,7 +101,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error {

// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (r *otlpReceiver) Start(ctx context.Context, host component.Host) error {
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
if err := r.startGRPCServer(host); err != nil {
return err
}
Expand All @@ -108,7 +110,7 @@ func (r *otlpReceiver) Start(ctx context.Context, host component.Host) error {
}

// Shutdown is a method to turn off receiving.
func (r *otlpReceiver) Shutdown(ctx context.Context) error {
func (r *otlpReceiver) Shutdown(_ context.Context) error {
var err error

if r.serverGRPC != nil {
Expand All @@ -131,14 +133,16 @@ type Receiver struct {
nextConsumer consumer.Metrics
obsreport *receiverhelper.ObsReport
metricMemStore types.MemStore
debug bool
}

// New creates a new Receiver reference.
func New(nextConsumer consumer.Metrics, obsreport *receiverhelper.ObsReport, memStore types.MemStore) *Receiver {
func New(nextConsumer consumer.Metrics, obsreport *receiverhelper.ObsReport, memStore types.MemStore, debug bool) *Receiver {
return &Receiver{
nextConsumer: nextConsumer,
obsreport: obsreport,
metricMemStore: memStore,
debug: debug,
}
}

Expand All @@ -149,8 +153,8 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
if dataPointCount == 0 {
return pmetricotlp.NewExportResponse(), nil
}
fmt.Printf("\n\nData point count: %d\n", dataPointCount)
//fmt.Printf("\n\nData points: %v\n", md.ResourceMetrics())
// using the printf instead of logger makes the metric data nicer in logs
r.p("\n\nData point count: %d\n", dataPointCount)
resLen := md.ResourceMetrics().Len()
for i := 0; i < resLen; i++ {
sm := md.ResourceMetrics().At(i).ScopeMetrics()
Expand All @@ -159,8 +163,8 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
mLen := sm.At(j).Metrics().Len()
metrics := sm.At(j).Metrics()
for k := 0; k < mLen; k++ {
fmt.Printf("- name: %+v\n", metrics.At(k).Name())
fmt.Printf(" type: %+v\n", metrics.At(k).Type())
r.p("- name: %+v\n", metrics.At(k).Name())
r.p(" type: %+v\n", metrics.At(k).Type())
var dataPoints pmetric.NumberDataPointSlice
switch metrics.At(k).Type() {
case pmetric.MetricTypeGauge:
Expand All @@ -172,10 +176,10 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
}
for l := 0; l < dataPoints.Len(); l++ {
datapoint := dataPoints.At(l)
fmt.Printf(" - time: %+v\n", datapoint.Timestamp())
fmt.Printf(" tags: %+v\n", datapoint.Attributes().AsRaw())
r.p(" - time: %+v\n", datapoint.Timestamp())
r.p(" tags: %+v\n", datapoint.Attributes().AsRaw())
value := math.Max(datapoint.DoubleValue(), float64(datapoint.IntValue()))
fmt.Printf(" value: %+v\n", value)
r.p(" value: %+v\n", value)
r.metricMemStore.Put(types.NewMetricEntry{
Name: types.MetricName(metrics.At(k).Name()),
ObservedValue: types.ObservedValue{
Expand All @@ -187,38 +191,25 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
}
}
}
//for k, v := range m {
// fmt.Printf("k=: %+v\n", k)
// fmt.Printf("v=: %+v\n", v)
//}
//for j := 0; j < md.ResourceMetrics().At(i).ScopeMetrics().Len(); i++ {
// md.ResourceMetrics().
// fmt.Printf("222internal: %+v\n", md.ResourceMetrics().At(i).ScopeMetrics().At(j).Metrics())
// //m := md.ResourceMetrics().At(i).Resource().Attributes().AsRaw()
// //for k, v := range m {
// // fmt.Printf("k=: %+v\n", k)
// // fmt.Printf("v=: %+v\n", v)
// //}
//}
}

ctx = r.obsreport.StartMetricsOp(ctx)
err := r.nextConsumer.ConsumeMetrics(ctx, md)
r.obsreport.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err)

// Use appropriate status codes for permanent/non-permanent errors
// If we return the error straightaway, then the grpc implementation will set status code to Unknown
// Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345
// So, convert the error to appropriate grpc status and return the error
// NonPermanent errors will be converted to codes.Unavailable (equivalent to HTTP 503)
// Permanent errors will be converted to codes.InvalidArgument (equivalent to HTTP 400)
if err != nil {
return pmetricotlp.NewExportResponse(), GetStatusFromError(err)
}

return pmetricotlp.NewExportResponse(), nil
}

func (r *Receiver) p(format string, a ...any) {
if r.debug {
fmt.Printf(format, a...)
}
}

func GetStatusFromError(err error) error {
s, ok := status.FromError(err)
if !ok {
Expand Down
Loading

0 comments on commit c4194f3

Please sign in to comment.