diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 4c51f8ba330..8797cd5f6b1 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -36,6 +36,7 @@ const ( defaultMaxRequeueNum = 15 defaultSchedulerName = "volcano" defaultHealthzAddress = ":11251" + defaultListenAddress = ":8080" defaultLockObjectNamespace = "volcano-system" defaultPodGroupWorkers = 5 defaultGCWorkers = 1 @@ -69,6 +70,8 @@ type ServerOption struct { // defaulting to 0.0.0.0:11251 HealthzBindAddress string EnableHealthz bool + EnableMetrics bool + ListenAddress string // To determine whether inherit owner's annotations for pods when create podgroup InheritOwnerAnnotations bool // WorkerThreadsForPG is the number of threads syncing podgroup operations @@ -113,6 +116,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) { fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue") fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.") fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default") + fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default") + fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default") fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.") fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.") diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index ce1db524add..429ea24686c 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -19,8 +19,12 @@ package app import ( "context" "fmt" + "net/http" "os" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/informers" @@ -31,6 +35,7 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" "volcano.sh/apis/pkg/apis/helpers" @@ -48,13 +53,19 @@ func Run(opt *options.ServerOption) error { if err != nil { return err } - if opt.EnableHealthz { if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller", opt.CaCertData, opt.CertData, opt.KeyData); err != nil { return err } } + if opt.EnableMetrics { + go func() { + http.Handle("/metrics", promHandler()) + klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil)) + }() + } + run := startControllers(config, opt) ctx := signals.SetupSignalContext() @@ -177,3 +188,10 @@ func isControllerEnabled(name string, controllers []string) bool { // if we get here, there was no explicit inclusion or exclusion return hasStar } + +func promHandler() http.Handler { + // Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them. + prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector()) + return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{})) +} diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml index 6fa8a96cd39..e3168e2e3c0 100644 --- a/installer/helm/chart/volcano/templates/controllers.yaml +++ b/installer/helm/chart/volcano/templates/controllers.yaml @@ -155,6 +155,7 @@ spec: args: - --logtostderr - --enable-healthz=true + - --enable-metrics=true - --leader-elect={{ .Values.custom.leader_elect_enable }} {{- if .Values.custom.leader_elect_enable }} - --leader-elect-resource-namespace={{ .Release.Namespace }} @@ -163,3 +164,28 @@ spec: - 2>&1 imagePullPolicy: {{ .Values.basic.image_pull_policy }} {{- end }} +--- +apiVersion: v1 +kind: Service +metadata: + annotations: + prometheus.io/path: /metrics + prometheus.io/port: "8080" + prometheus.io/scrape: "true" + name: {{ .Release.Name }}-controllers-service + namespace: {{ .Release.Namespace }} + labels: + app: volcano-controller + {{- if .Values.custom.common_labels }} + {{- toYaml .Values.custom.common_labels | nindent 4 }} + {{- end }} +spec: + ports: + - port: 8080 + protocol: TCP + targetPort: 8080 + name: "metrics" + selector: + app: volcano-controller + type: ClusterIP +{{- end }} diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go index 0e24a4b6d93..0bcf0b9466a 100644 --- a/pkg/controllers/job/state/completing.go +++ b/pkg/controllers/job/state/completing.go @@ -17,9 +17,12 @@ limitations under the License. package state import ( + "fmt" + vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" + "volcano.sh/volcano/pkg/scheduler/metrics" ) type completingState struct { @@ -33,6 +36,7 @@ func (ps *completingState) Execute(action v1alpha1.Action) error { return false } status.State.Phase = vcbatch.Completed + metrics.RegisterJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name)) return true }) } diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index 08131ba2374..e65cfd3730a 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -17,9 +17,12 @@ limitations under the License. package state import ( + "fmt" + vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" + "volcano.sh/volcano/pkg/scheduler/metrics" ) type restartingState struct { @@ -34,6 +37,7 @@ func (ps *restartingState) Execute(action v1alpha1.Action) error { if status.RetryCount >= maxRetry { // Failed is the phase that the job is restarted failed reached the maximum number of retries. status.State.Phase = vcbatch.Failed + metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name)) return true } total := int32(0) diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index 4c2ade17bb7..18d3a0e2352 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -17,11 +17,14 @@ limitations under the License. package state import ( + "fmt" + v1 "k8s.io/api/core/v1" vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" + "volcano.sh/volcano/pkg/scheduler/metrics" ) type runningState struct { @@ -62,6 +65,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { minSuccess := ps.job.Job.Spec.MinSuccess if minSuccess != nil && status.Succeeded >= *minSuccess { status.State.Phase = vcbatch.Completed + metrics.RegisterJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name)) return true } @@ -76,6 +80,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { if taskStatus, ok := status.TaskStatusCount[task.Name]; ok { if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable { status.State.Phase = vcbatch.Failed + metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name)) return true } } @@ -84,10 +89,13 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { if minSuccess != nil && status.Succeeded < *minSuccess { status.State.Phase = vcbatch.Failed + metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name)) } else if status.Succeeded >= ps.job.Job.Spec.MinAvailable { status.State.Phase = vcbatch.Completed + metrics.RegisterJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name)) } else { status.State.Phase = vcbatch.Failed + metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name)) } return true } diff --git a/pkg/scheduler/metrics/job.go b/pkg/scheduler/metrics/job.go index 47ef091a698..e5594ab649d 100644 --- a/pkg/scheduler/metrics/job.go +++ b/pkg/scheduler/metrics/job.go @@ -37,8 +37,34 @@ var ( Help: "Number of retry counts for one job", }, []string{"job_id"}, ) + + jobCompletedPhaseCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: VolcanoNamespace, + Name: "job_completed_phase_count", + Help: "Number of jobs in completed phase", + }, []string{"job_name"}, + ) + + jobFailedPhaseCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: VolcanoNamespace, + Name: "job_failed_phase_count", + Help: "Number of jobs in failed phase", + }, []string{"job_name"}, + ) ) +// RegisterJobCompleted records the number of jobs in completed phase +func RegisterJobCompleted(jobName string) { + jobCompletedPhaseCount.WithLabelValues(jobName).Inc() +} + +// RegisterJobFailed records the number of jobs in failed phase +func RegisterJobFailed(jobName string) { + jobFailedPhaseCount.WithLabelValues(jobName).Inc() +} + // UpdateJobShare records share for one job func UpdateJobShare(jobNs, jobID string, share float64) { jobShare.WithLabelValues(jobNs, jobID).Set(share) @@ -57,4 +83,6 @@ func DeleteJobMetrics(jobName, queue, namespace string) { unscheduleTaskCount.DeleteLabelValues(jobName) jobShare.DeleteLabelValues(namespace, jobName) jobRetryCount.DeleteLabelValues(jobName) + jobCompletedPhaseCount.DeleteLabelValues(namespace, jobName) + jobFailedPhaseCount.DeleteLabelValues(namespace, jobName) }