Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add volcano jobs phase metric #3650

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ jobs:
run: |
eval $(minikube docker-env)
make TAG=latest update-development-yaml
sed -i 's/imagePullPolicy: Always/imagePullPolicy: IfNotPresent/g' installer/volcano-development.yaml
make TAG=latest images
docker images | grep volcano
cat ./installer/volcano-development.yaml | grep image:
Expand Down
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
defaultMaxRequeueNum = 15
defaultSchedulerName = "volcano"
defaultHealthzAddress = ":11251"
defaultListenAddress = ":8081"
defaultLockObjectNamespace = "volcano-system"
defaultPodGroupWorkers = 5
defaultGCWorkers = 1
Expand Down Expand Up @@ -70,6 +71,8 @@ type ServerOption struct {
// defaulting to 0.0.0.0:11251
HealthzBindAddress string
EnableHealthz bool
EnableMetrics bool
ListenAddress string
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
// WorkerThreadsForPG is the number of threads syncing podgroup operations
Expand Down Expand Up @@ -114,6 +117,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) {
fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.")
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestAddFlags(t *testing.T) {
SchedulerNames: []string{"volcano", "volcano2"},
MaxRequeueNum: defaultMaxRequeueNum,
HealthzBindAddress: ":11251",
ListenAddress: defaultListenAddress,
InheritOwnerAnnotations: true,
LeaderElection: config.LeaderElectionConfiguration{
LeaderElect: true,
Expand Down
10 changes: 9 additions & 1 deletion cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package app
import (
"context"
"fmt"
"net/http"
"os"

v1 "k8s.io/api/core/v1"
Expand All @@ -40,6 +41,7 @@ import (
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/kube"
"volcano.sh/volcano/pkg/signals"
commonutil "volcano.sh/volcano/pkg/util"
)

// Run the controller.
Expand All @@ -48,13 +50,19 @@ func Run(opt *options.ServerOption) error {
if err != nil {
return err
}

if opt.EnableHealthz {
if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller", opt.CaCertData, opt.CertData, opt.KeyData); err != nil {
return err
}
}

if opt.EnableMetrics {
go func() {
http.Handle("/metrics", commonutil.PromHandler())
klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()
}

run := startControllers(config, opt)

ctx := signals.SetupSignalContext()
Expand Down
13 changes: 1 addition & 12 deletions cmd/scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ import (
"volcano.sh/volcano/pkg/signals"
commonutil "volcano.sh/volcano/pkg/util"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"

// Register gcp auth
Expand Down Expand Up @@ -75,7 +71,7 @@ func Run(opt *options.ServerOption) error {

if opt.EnableMetrics {
go func() {
http.Handle("/metrics", promHandler())
http.Handle("/metrics", commonutil.PromHandler())
klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()
}
Expand Down Expand Up @@ -146,10 +142,3 @@ func Run(opt *options.ServerOption) error {
})
return fmt.Errorf("lost lease")
}

func promHandler() http.Handler {
// Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them.
prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{}))
}
29 changes: 28 additions & 1 deletion installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ spec:
args:
- --logtostderr
- --enable-healthz=true
{{- if .Values.custom.controller_metrics_enable }}
- --enable-metrics=true
{{- end }}
- --leader-elect={{ .Values.custom.leader_elect_enable }}
{{- if $scheduler_name }}
- --scheduler-name={{- $scheduler_name }}
Expand All @@ -170,4 +173,28 @@ spec:
securityContext:
{{- toYaml .Values.custom.controller_default_csc | nindent 14 }}
{{- end }}
{{- end }}
---
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8081"
prometheus.io/scrape: "true"
name: {{ .Release.Name }}-controllers-service
namespace: {{ .Release.Namespace }}
labels:
app: volcano-controller
{{- if .Values.custom.common_labels }}
{{- toYaml .Values.custom.common_labels | nindent 4 }}
{{- end }}
spec:
ports:
- port: 8081
protocol: TCP
targetPort: 8081
name: "metrics"
selector:
app: volcano-controller
type: ClusterIP
{{- end }}
2 changes: 2 additions & 0 deletions installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ spec:
- --scheduler-name={{- $scheduler_name }}
{{- end }}
- --enable-healthz=true
{{- if .Values.custom.scheduler_metrics_enable }}
- --enable-metrics=true
{{- end }}
- --leader-elect={{ .Values.custom.leader_elect_enable }}
{{- if .Values.custom.leader_elect_enable }}
- --leader-elect-resource-namespace={{ .Release.Namespace }}
Expand Down
2 changes: 2 additions & 0 deletions installer/helm/chart/volcano/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ custom:
admission_replicas: 1
controller_enable: true
controller_replicas: 1
controller_metrics_enable: true
scheduler_enable: true
scheduler_replicas: 1
scheduler_metrics_enable: true
scheduler_name: ~
leader_elect_enable: false
enabled_admissions: "/jobs/mutate,/jobs/validate,/podgroups/mutate,/pods/validate,/pods/mutate,/queues/mutate,/queues/validate"
Expand Down
23 changes: 23 additions & 0 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4331,6 +4331,28 @@ roleRef:
apiGroup: rbac.authorization.k8s.io
---
# Source: volcano/templates/controllers.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8081"
prometheus.io/scrape: "true"
name: volcano-controllers-service
namespace: volcano-system
labels:
app: volcano-controller
spec:
ports:
- port: 8081
protocol: TCP
targetPort: 8081
name: "metrics"
selector:
app: volcano-controller
type: ClusterIP
---
# Source: volcano/templates/controllers.yaml
kind: Deployment
apiVersion: apps/v1
metadata:
Expand All @@ -4356,6 +4378,7 @@ spec:
args:
- --logtostderr
- --enable-healthz=true
- --enable-metrics=true
- --leader-elect=false
- -v=4
- 2>&1
Expand Down
3 changes: 0 additions & 3 deletions installer/volcano-monitoring-latest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,6 @@ spec:
timeoutSeconds: 5
dnsPolicy: ClusterFirst

nodeSelector:
node.kubernetes.io/instance-type: controlpanel

serviceAccountName: kube-state-metrics
---
# Source: volcano/templates/grafana.yaml
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"volcano.sh/volcano/pkg/controllers/apis"
jobcache "volcano.sh/volcano/pkg/controllers/cache"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/state"
)

func (cc *jobcontroller) addCommand(obj interface{}) {
Expand Down Expand Up @@ -133,6 +134,9 @@ func (cc *jobcontroller) deleteJob(obj interface{}) {
klog.Errorf("Failed to delete job <%s/%s>: %v in cache",
job.Namespace, job.Name, err)
}

// Delete job metrics
state.DeleteJobMetrics(fmt.Sprintf("%s/%s", job.Namespace, job.Name), job.Spec.Queue)
}

func (cc *jobcontroller) addPod(obj interface{}) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/job/state/completing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package state

import (
"fmt"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
Expand All @@ -33,6 +35,7 @@ func (ps *completingState) Execute(action v1alpha1.Action) error {
return false
}
status.State.Phase = vcbatch.Completed
UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
})
}
39 changes: 39 additions & 0 deletions pkg/controllers/job/state/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package state

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"volcano.sh/volcano/pkg/scheduler/metrics"
)

var (
jobCompletedPhaseCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "job_completed_phase_count",
Help: "Number of job completed phase",
}, []string{"job_name", "queue_name"},
)

jobFailedPhaseCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "job_failed_phase_count",
Help: "Number of job failed phase",
}, []string{"job_name", "queue_name"},
)
)

func UpdateJobCompleted(jobName, queueName string) {
jobCompletedPhaseCount.WithLabelValues(jobName, queueName).Inc()
}

func UpdateJobFailed(jobName, queueName string) {
jobFailedPhaseCount.WithLabelValues(jobName, queueName).Inc()
}

func DeleteJobMetrics(jobName, queueName string) {
Prepmachine4 marked this conversation as resolved.
Show resolved Hide resolved
jobCompletedPhaseCount.DeleteLabelValues(jobName, queueName)
jobFailedPhaseCount.DeleteLabelValues(jobName, queueName)
}
3 changes: 3 additions & 0 deletions pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package state

import (
"fmt"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
Expand All @@ -34,6 +36,7 @@ func (ps *restartingState) Execute(action v1alpha1.Action) error {
if status.RetryCount >= maxRetry {
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
}
total := int32(0)
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package state

import (
"fmt"

v1 "k8s.io/api/core/v1"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand Down Expand Up @@ -62,6 +64,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {
minSuccess := ps.job.Job.Spec.MinSuccess
if minSuccess != nil && status.Succeeded >= *minSuccess {
status.State.Phase = vcbatch.Completed
UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
}

Expand All @@ -76,6 +79,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {
if taskStatus, ok := status.TaskStatusCount[task.Name]; ok {
if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable {
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
}
}
Expand All @@ -84,10 +88,13 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {

if minSuccess != nil && status.Succeeded < *minSuccess {
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
} else if status.Succeeded >= ps.job.Job.Spec.MinAvailable {
status.State.Phase = vcbatch.Completed
UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
} else {
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
}
return true
}
Expand Down
1 change: 0 additions & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
cpuinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/nodeinfo/v1alpha1"
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package util

import (
"net/http"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/component-base/config"
"k8s.io/component-base/metrics/legacyregistry"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
Expand Down Expand Up @@ -47,3 +53,10 @@ func LeaderElectionDefault(l *config.LeaderElectionConfiguration) {
l.ResourceLock = resourcelock.LeasesResourceLock
l.ResourceNamespace = defaultLockObjectNamespace
}

func PromHandler() http.Handler {
// Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them.
prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{}))
}
Loading