Skip to content

Commit

Permalink
Merge branch 'kedacore:main' into feat-logging-topic-metadata-kafka-s…
Browse files Browse the repository at this point in the history
…caler
  • Loading branch information
dttung2905 authored Nov 2, 2023
2 parents cefece4 + faf8c9a commit e9cc8ee
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 70 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ New deprecation(s):

- **General**: Fix CVE-2023-45142 in Opentelemetry ([#5089](https://github.com/kedacore/keda/issues/5089))
- **General**: Fix logger in Opentelemetry collector ([#5094](https://github.com/kedacore/keda/issues/5094))
- **General**: Reduce amount of gauge creations for OpenTelemetry metrics ([#5101](https://github.com/kedacore/keda/issues/5101))
- **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789))

## v2.12.0

Expand Down
9 changes: 6 additions & 3 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var (
metricsAPIServerPort int
disableCompression bool
metricsServiceAddr string
profilingAddr string
)

func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsProvider, <-chan struct{}, error) {
Expand Down Expand Up @@ -111,9 +112,10 @@ func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsPro
Cache: ctrlcache.Options{
DefaultNamespaces: namespaces,
},
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
PprofBindAddress: profilingAddr,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
})
if err != nil {
logger.Error(err, "failed to setup manager")
Expand Down Expand Up @@ -231,6 +233,7 @@ func main() {
cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the klog flags
cmd.Flags().IntVar(&metricsAPIServerPort, "port", 8080, "Set the port for the metrics API server")
cmd.Flags().StringVar(&metricsServiceAddr, "metrics-service-address", generateDefaultMetricsServiceAddr(), "The address of the gRPRC Metrics Service Server.")
cmd.Flags().StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.")
cmd.Flags().Float32Var(&adapterClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver")
cmd.Flags().IntVar(&adapterClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver")
cmd.Flags().BoolVar(&disableCompression, "disable-compression", true, "Disable response compression for k8s restAPI in client-go. ")
Expand Down
3 changes: 3 additions & 0 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func main() {
var metricsAddr string
var probeAddr string
var metricsServiceAddr string
var profilingAddr string
var enableLeaderElection bool
var adapterClientRequestQPS float32
var adapterClientRequestBurst int
Expand All @@ -84,6 +85,7 @@ func main() {
pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the prometheus metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
pflag.StringVar(&metricsServiceAddr, "metrics-service-bind-address", ":9666", "The address the gRPRC Metrics Service endpoint binds to.")
pflag.StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.")
pflag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
Expand Down Expand Up @@ -151,6 +153,7 @@ func main() {
DefaultNamespaces: namespaces,
},
HealthProbeBindAddress: probeAddr,
PprofBindAddress: profilingAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "operator.keda.sh",
LeaseDuration: leaseDuration,
Expand Down
3 changes: 3 additions & 0 deletions cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ func init() {
func main() {
var metricsAddr string
var probeAddr string
var profilingAddr string
var webhooksClientRequestQPS float32
var webhooksClientRequestBurst int
var certDir string
var webhooksPort int

pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
pflag.StringVar(&profilingAddr, "profiling-bind-address", "", "The address the profiling would be exposed on.")
pflag.Float32Var(&webhooksClientRequestQPS, "kube-api-qps", 20.0, "Set the QPS rate for throttling requests sent to the apiserver")
pflag.IntVar(&webhooksClientRequestBurst, "kube-api-burst", 30, "Set the burst for throttling requests sent to the apiserver")
pflag.StringVar(&certDir, "cert-dir", "/certs", "Webhook certificates dir to use. Defaults to /certs")
Expand Down Expand Up @@ -96,6 +98,7 @@ func main() {
},
}),
HealthProbeBindAddress: probeAddr,
PprofBindAddress: profilingAddr,
})
if err != nil {
setupLog.Error(err, "unable to start admission webhooks")
Expand Down
186 changes: 119 additions & 67 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,28 @@ var (
otScaledObjectErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otelScalerActiveVal OtelMetricFloat64Val
)

type OtelMetrics struct {
}

type OtelMetricInt64Val struct {
val int64
measurementOption api.MeasurementOption
}

type OtelMetricFloat64Val struct {
val float64
measurementOption api.MeasurementOption
}

func NewOtelMetrics(options ...metric.Option) *OtelMetrics {
// create default options with env
if options == nil {
Expand All @@ -48,14 +65,14 @@ func NewOtelMetrics(options ...metric.Option) *OtelMetrics {
otel.SetMeterProvider(meterProvider)

meter = meterProvider.Meter(meterName)
initCounter()
initMeters()

otel := &OtelMetrics{}
otel.RecordBuildInfo()
return otel
}

func initCounter() {
func initMeters() {
var err error
msg := "create opentelemetry counter failed"

Expand All @@ -78,37 +95,107 @@ func initCounter() {
if err != nil {
otLog.Error(err, msg)
}
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.value",
api.WithDescription("Metric Value used for HPA"),
api.WithFloat64Callback(cback),
api.WithFloat64Callback(ScalerMetricValueCallback),
)
if err != nil {
otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
otLog.Error(err, msg)
}
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.latency",
api.WithDescription("Scaler Metrics Latency"),
api.WithFloat64Callback(cback),
api.WithFloat64Callback(ScalerMetricsLatencyCallback),
)
if err != nil {
otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithFloat64Callback(ScalableObjectLatencyCallback),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithFloat64Callback(ScalerActiveCallback),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Int64ObservableGauge(
"keda.build.info",
api.WithDescription("A metric with a constant '1' value labeled by version, git_commit and goversion from which KEDA was built."),
api.WithInt64Callback(BuildInfoCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
if otelBuildInfoVal.measurementOption != nil {
obsrv.Observe(otelBuildInfoVal.val, otelBuildInfoVal.measurementOption)
}
otelBuildInfoVal = OtelMetricInt64Val{}
return nil
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func (o *OtelMetrics) RecordBuildInfo() {
opt := api.WithAttributes(
attribute.Key("version").String(version.Version),
attribute.Key("git_commit").String(version.GitCommit),
attribute.Key("goversion").String(runtime.Version()),
attribute.Key("goos").String(runtime.GOOS),
attribute.Key("goarch").String(runtime.GOARCH),
)
otelBuildInfoVal.val = 1
otelBuildInfoVal.measurementOption = opt
}

func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricVal.val, otelScalerMetricVal.measurementOption)
}
otelScalerMetricVal = OtelMetricFloat64Val{}
return nil
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
otelScalerMetricVal.val = value
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyVal.val, otelScalerMetricsLatencyVal.measurementOption)
}
otelScalerMetricsLatencyVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
otelScalerMetricsLatencyVal.val = value
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyVal.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyVal.val, otelInternalLoopLatencyVal.measurementOption)
}
otelInternalLoopLatencyVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
Expand All @@ -123,18 +210,16 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
attribute.Key("type").String(resourceType),
attribute.Key("name").String(name))

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, opt)
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register internal scale loop latency", "namespace", namespace, resourceType, name)
otelInternalLoopLatencyVal.val = value
otelInternalLoopLatencyVal.measurementOption = opt
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerActiveVal.measurementOption != nil {
obsrv.Observe(otelScalerActiveVal.val, otelScalerActiveVal.measurementOption)
}
otelScalerActiveVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalerActive create a measurement of the activity of the scaler
Expand All @@ -144,18 +229,8 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string,
activeVal = 1
}

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
}
otelScalerActiveVal.val = float64(activeVal)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand Down Expand Up @@ -204,29 +279,6 @@ func (o *OtelMetrics) RecordScaledObjectError(namespace string, scaledObject str
}
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func (o *OtelMetrics) RecordBuildInfo() {
opt := api.WithAttributes(
attribute.Key("version").String(version.Version),
attribute.Key("git_commit").String(version.GitCommit),
attribute.Key("goversion").String(runtime.Version()),
attribute.Key("goos").String(runtime.GOOS),
attribute.Key("goarch").String(runtime.GOARCH),
)
cback := func(ctx context.Context, obsrv api.Int64Observer) error {
obsrv.Observe(1, opt)
return nil
}
_, err := meter.Int64ObservableGauge(
"keda.build.info",
api.WithDescription("A metric with a constant '1' value labeled by version, git_commit and goversion from which KEDA was built."),
api.WithInt64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register build info")
}
}

func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
Expand Down

0 comments on commit e9cc8ee

Please sign in to comment.