diff --git a/.github/workflows/e2e_spark.yaml b/.github/workflows/e2e_spark.yaml index e70bf26be9..e67d7cee2b 100644 --- a/.github/workflows/e2e_spark.yaml +++ b/.github/workflows/e2e_spark.yaml @@ -62,6 +62,7 @@ jobs: run: | eval $(minikube docker-env) make TAG=latest update-development-yaml + sed -i 's/imagePullPolicy: Always/imagePullPolicy: IfNotPresent/g' installer/volcano-development.yaml make TAG=latest images docker images | grep volcano cat ./installer/volcano-development.yaml | grep image: diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 4ed6754f10..273b510379 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -37,6 +37,7 @@ const ( defaultMaxRequeueNum = 15 defaultSchedulerName = "volcano" defaultHealthzAddress = ":11251" + defaultListenAddress = ":8081" defaultLockObjectNamespace = "volcano-system" defaultPodGroupWorkers = 5 defaultGCWorkers = 1 @@ -70,6 +71,8 @@ type ServerOption struct { // defaulting to 0.0.0.0:11251 HealthzBindAddress string EnableHealthz bool + EnableMetrics bool + ListenAddress string // To determine whether inherit owner's annotations for pods when create podgroup InheritOwnerAnnotations bool // WorkerThreadsForPG is the number of threads syncing podgroup operations @@ -114,6 +117,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) { fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue") fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.") fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default") + fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default") + fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default") fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.") fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.") diff --git a/cmd/controller-manager/app/options/options_test.go b/cmd/controller-manager/app/options/options_test.go index a17e71f601..a0ee14a0e7 100644 --- a/cmd/controller-manager/app/options/options_test.go +++ b/cmd/controller-manager/app/options/options_test.go @@ -89,6 +89,7 @@ func TestAddFlags(t *testing.T) { SchedulerNames: []string{"volcano", "volcano2"}, MaxRequeueNum: defaultMaxRequeueNum, HealthzBindAddress: ":11251", + ListenAddress: defaultListenAddress, InheritOwnerAnnotations: true, LeaderElection: config.LeaderElectionConfiguration{ LeaderElect: true, diff --git a/cmd/controller-manager/app/server.go b/cmd/controller-manager/app/server.go index 8b80986647..471d814a82 100644 --- a/cmd/controller-manager/app/server.go +++ b/cmd/controller-manager/app/server.go @@ -19,6 +19,7 @@ package app import ( "context" "fmt" + "net/http" "os" v1 "k8s.io/api/core/v1" @@ -40,6 +41,7 @@ import ( "volcano.sh/volcano/pkg/controllers/framework" "volcano.sh/volcano/pkg/kube" "volcano.sh/volcano/pkg/signals" + commonutil "volcano.sh/volcano/pkg/util" ) // Run the controller. @@ -48,13 +50,19 @@ func Run(opt *options.ServerOption) error { if err != nil { return err } - if opt.EnableHealthz { if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller", opt.CaCertData, opt.CertData, opt.KeyData); err != nil { return err } } + if opt.EnableMetrics { + go func() { + http.Handle("/metrics", commonutil.PromHandler()) + klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil)) + }() + } + run := startControllers(config, opt) ctx := signals.SetupSignalContext() diff --git a/cmd/scheduler/app/server.go b/cmd/scheduler/app/server.go index c797ac4e19..cbe5909d9b 100644 --- a/cmd/scheduler/app/server.go +++ b/cmd/scheduler/app/server.go @@ -31,15 +31,11 @@ import ( "volcano.sh/volcano/pkg/signals" commonutil "volcano.sh/volcano/pkg/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/uuid" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" // Register gcp auth @@ -75,7 +71,7 @@ func Run(opt *options.ServerOption) error { if opt.EnableMetrics { go func() { - http.Handle("/metrics", promHandler()) + http.Handle("/metrics", commonutil.PromHandler()) klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil)) }() } @@ -146,10 +142,3 @@ func Run(opt *options.ServerOption) error { }) return fmt.Errorf("lost lease") } - -func promHandler() http.Handler { - // Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them. - prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector()) - return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{})) -} diff --git a/installer/helm/chart/volcano/templates/controllers.yaml b/installer/helm/chart/volcano/templates/controllers.yaml index 0375f6d5dd..3fc6cf35f5 100644 --- a/installer/helm/chart/volcano/templates/controllers.yaml +++ b/installer/helm/chart/volcano/templates/controllers.yaml @@ -156,6 +156,9 @@ spec: args: - --logtostderr - --enable-healthz=true + {{- if .Values.custom.controller_metrics_enable }} + - --enable-metrics=true + {{- end }} - --leader-elect={{ .Values.custom.leader_elect_enable }} {{- if $scheduler_name }} - --scheduler-name={{- $scheduler_name }} @@ -170,4 +173,28 @@ spec: securityContext: {{- toYaml .Values.custom.controller_default_csc | nindent 14 }} {{- end }} -{{- end }} +--- +apiVersion: v1 +kind: Service +metadata: + annotations: + prometheus.io/path: /metrics + prometheus.io/port: "8081" + prometheus.io/scrape: "true" + name: {{ .Release.Name }}-controllers-service + namespace: {{ .Release.Namespace }} + labels: + app: volcano-controller + {{- if .Values.custom.common_labels }} + {{- toYaml .Values.custom.common_labels | nindent 4 }} + {{- end }} +spec: + ports: + - port: 8081 + protocol: TCP + targetPort: 8081 + name: "metrics" + selector: + app: volcano-controller + type: ClusterIP +{{- end }} \ No newline at end of file diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index 1e87a8788d..f5ad36863a 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -182,7 +182,9 @@ spec: - --scheduler-name={{- $scheduler_name }} {{- end }} - --enable-healthz=true + {{- if .Values.custom.scheduler_metrics_enable }} - --enable-metrics=true + {{- end }} - --leader-elect={{ .Values.custom.leader_elect_enable }} {{- if .Values.custom.leader_elect_enable }} - --leader-elect-resource-namespace={{ .Release.Namespace }} diff --git a/installer/helm/chart/volcano/values.yaml b/installer/helm/chart/volcano/values.yaml index e4128035a5..2da9920ffc 100644 --- a/installer/helm/chart/volcano/values.yaml +++ b/installer/helm/chart/volcano/values.yaml @@ -16,8 +16,10 @@ custom: admission_replicas: 1 controller_enable: true controller_replicas: 1 + controller_metrics_enable: true scheduler_enable: true scheduler_replicas: 1 + scheduler_metrics_enable: true scheduler_name: ~ leader_elect_enable: false enabled_admissions: "/jobs/mutate,/jobs/validate,/podgroups/mutate,/pods/validate,/pods/mutate,/queues/mutate,/queues/validate" diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 6ebe086dce..f15783e40c 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -4331,6 +4331,28 @@ roleRef: apiGroup: rbac.authorization.k8s.io --- # Source: volcano/templates/controllers.yaml +apiVersion: v1 +kind: Service +metadata: + annotations: + prometheus.io/path: /metrics + prometheus.io/port: "8081" + prometheus.io/scrape: "true" + name: volcano-controllers-service + namespace: volcano-system + labels: + app: volcano-controller +spec: + ports: + - port: 8081 + protocol: TCP + targetPort: 8081 + name: "metrics" + selector: + app: volcano-controller + type: ClusterIP +--- +# Source: volcano/templates/controllers.yaml kind: Deployment apiVersion: apps/v1 metadata: @@ -4356,6 +4378,7 @@ spec: args: - --logtostderr - --enable-healthz=true + - --enable-metrics=true - --leader-elect=false - -v=4 - 2>&1 diff --git a/installer/volcano-monitoring-latest.yaml b/installer/volcano-monitoring-latest.yaml index edd44d161e..2e5a4c871f 100644 --- a/installer/volcano-monitoring-latest.yaml +++ b/installer/volcano-monitoring-latest.yaml @@ -446,9 +446,6 @@ spec: timeoutSeconds: 5 dnsPolicy: ClusterFirst - nodeSelector: - node.kubernetes.io/instance-type: controlpanel - serviceAccountName: kube-state-metrics --- # Source: volcano/templates/grafana.yaml diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 979a108d1d..b97a25be29 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -36,6 +36,7 @@ import ( "volcano.sh/volcano/pkg/controllers/apis" jobcache "volcano.sh/volcano/pkg/controllers/cache" jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" + "volcano.sh/volcano/pkg/controllers/job/state" ) func (cc *jobcontroller) addCommand(obj interface{}) { @@ -133,6 +134,9 @@ func (cc *jobcontroller) deleteJob(obj interface{}) { klog.Errorf("Failed to delete job <%s/%s>: %v in cache", job.Namespace, job.Name, err) } + + // Delete job metrics + state.DeleteJobMetrics(fmt.Sprintf("%s/%s", job.Namespace, job.Name), job.Spec.Queue) } func (cc *jobcontroller) addPod(obj interface{}) { diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go index 0e24a4b6d9..1b7b102e2d 100644 --- a/pkg/controllers/job/state/completing.go +++ b/pkg/controllers/job/state/completing.go @@ -17,6 +17,8 @@ limitations under the License. package state import ( + "fmt" + vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" @@ -33,6 +35,7 @@ func (ps *completingState) Execute(action v1alpha1.Action) error { return false } status.State.Phase = vcbatch.Completed + UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) return true }) } diff --git a/pkg/controllers/job/state/metrics.go b/pkg/controllers/job/state/metrics.go new file mode 100644 index 0000000000..b2531ad046 --- /dev/null +++ b/pkg/controllers/job/state/metrics.go @@ -0,0 +1,39 @@ +package state + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "volcano.sh/volcano/pkg/scheduler/metrics" +) + +var ( + jobCompletedPhaseCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "job_completed_phase_count", + Help: "Number of job completed phase", + }, []string{"job_name", "queue_name"}, + ) + + jobFailedPhaseCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: metrics.VolcanoNamespace, + Name: "job_failed_phase_count", + Help: "Number of job failed phase", + }, []string{"job_name", "queue_name"}, + ) +) + +func UpdateJobCompleted(jobName, queueName string) { + jobCompletedPhaseCount.WithLabelValues(jobName, queueName).Inc() +} + +func UpdateJobFailed(jobName, queueName string) { + jobFailedPhaseCount.WithLabelValues(jobName, queueName).Inc() +} + +func DeleteJobMetrics(jobName, queueName string) { + jobCompletedPhaseCount.DeleteLabelValues(jobName, queueName) + jobFailedPhaseCount.DeleteLabelValues(jobName, queueName) +} diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index 08131ba237..7c7bc4404c 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -17,6 +17,8 @@ limitations under the License. package state import ( + "fmt" + vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" @@ -34,6 +36,7 @@ func (ps *restartingState) Execute(action v1alpha1.Action) error { if status.RetryCount >= maxRetry { // Failed is the phase that the job is restarted failed reached the maximum number of retries. status.State.Phase = vcbatch.Failed + UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) return true } total := int32(0) diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index 4c2ade17bb..ca0c688a2d 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -17,6 +17,8 @@ limitations under the License. package state import ( + "fmt" + v1 "k8s.io/api/core/v1" vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" @@ -62,6 +64,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { minSuccess := ps.job.Job.Spec.MinSuccess if minSuccess != nil && status.Succeeded >= *minSuccess { status.State.Phase = vcbatch.Completed + UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) return true } @@ -76,6 +79,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { if taskStatus, ok := status.TaskStatusCount[task.Name]; ok { if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable { status.State.Phase = vcbatch.Failed + UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) return true } } @@ -84,10 +88,13 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { if minSuccess != nil && status.Succeeded < *minSuccess { status.State.Phase = vcbatch.Failed + UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) } else if status.Succeeded >= ps.job.Job.Spec.MinAvailable { status.State.Phase = vcbatch.Completed + UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) } else { status.State.Phase = vcbatch.Failed + UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) } return true } diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index f2498ee1aa..687f26533d 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -62,7 +62,6 @@ import ( vcinformer "volcano.sh/apis/pkg/client/informers/externalversions" cpuinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/nodeinfo/v1alpha1" vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" - "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/features" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" diff --git a/pkg/util/util.go b/pkg/util/util.go index 1038fd7977..d110e3cb9e 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -1,11 +1,17 @@ package util import ( + "net/http" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/component-base/config" + "k8s.io/component-base/metrics/legacyregistry" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( @@ -47,3 +53,10 @@ func LeaderElectionDefault(l *config.LeaderElectionConfiguration) { l.ResourceLock = resourcelock.LeasesResourceLock l.ResourceNamespace = defaultLockObjectNamespace } + +func PromHandler() http.Handler { + // Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them. + prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector()) + return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{})) +}