Skip to content

Commit

Permalink
feat: add volcano job completed and failed phase metric
Browse files Browse the repository at this point in the history
Signed-off-by: Prepmachine4 <prepmachine4@gmail.com>
  • Loading branch information
Prepmachine4 committed Oct 16, 2024
1 parent 3e5e026 commit 8a1a43e
Show file tree
Hide file tree
Showing 16 changed files with 137 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/workflows/e2e_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ jobs:
run: |
eval $(minikube docker-env)
make TAG=latest update-development-yaml
sed -i 's/imagePullPolicy: Always/imagePullPolicy: IfNotPresent/g' installer/volcano-development.yaml
make TAG=latest images
docker images | grep volcano
cat ./installer/volcano-development.yaml | grep image:
Expand Down
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
defaultMaxRequeueNum = 15
defaultSchedulerName = "volcano"
defaultHealthzAddress = ":11251"
defaultListenAddress = ":8081"
defaultLockObjectNamespace = "volcano-system"
defaultPodGroupWorkers = 5
defaultGCWorkers = 1
Expand Down Expand Up @@ -70,6 +71,8 @@ type ServerOption struct {
// defaulting to 0.0.0.0:11251
HealthzBindAddress string
EnableHealthz bool
EnableMetrics bool
ListenAddress string
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
// WorkerThreadsForPG is the number of threads syncing podgroup operations
Expand Down Expand Up @@ -114,6 +117,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) {
fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.")
Expand Down
1 change: 1 addition & 0 deletions cmd/controller-manager/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestAddFlags(t *testing.T) {
SchedulerNames: []string{"volcano", "volcano2"},
MaxRequeueNum: defaultMaxRequeueNum,
HealthzBindAddress: ":11251",
ListenAddress: defaultListenAddress,
InheritOwnerAnnotations: true,
LeaderElection: config.LeaderElectionConfiguration{
LeaderElect: true,
Expand Down
10 changes: 9 additions & 1 deletion cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package app
import (
"context"
"fmt"
"net/http"
"os"

v1 "k8s.io/api/core/v1"
Expand All @@ -40,6 +41,7 @@ import (
"volcano.sh/volcano/pkg/controllers/framework"
"volcano.sh/volcano/pkg/kube"
"volcano.sh/volcano/pkg/signals"
commonutil "volcano.sh/volcano/pkg/util"
)

// Run the controller.
Expand All @@ -48,13 +50,19 @@ func Run(opt *options.ServerOption) error {
if err != nil {
return err
}

if opt.EnableHealthz {
if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller", opt.CaCertData, opt.CertData, opt.KeyData); err != nil {
return err
}
}

if opt.EnableMetrics {
go func() {
http.Handle("/metrics", commonutil.PromHandler())
klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()
}

run := startControllers(config, opt)

ctx := signals.SetupSignalContext()
Expand Down
13 changes: 1 addition & 12 deletions cmd/scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ import (
"volcano.sh/volcano/pkg/signals"
commonutil "volcano.sh/volcano/pkg/util"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"

// Register gcp auth
Expand Down Expand Up @@ -75,7 +71,7 @@ func Run(opt *options.ServerOption) error {

if opt.EnableMetrics {
go func() {
http.Handle("/metrics", promHandler())
http.Handle("/metrics", commonutil.PromHandler())
klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()
}
Expand Down Expand Up @@ -146,10 +142,3 @@ func Run(opt *options.ServerOption) error {
})
return fmt.Errorf("lost lease")
}

func promHandler() http.Handler {
// Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them.
prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{}))
}
29 changes: 28 additions & 1 deletion installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ spec:
args:
- --logtostderr
- --enable-healthz=true
{{- if .Values.custom.controller_metrics_enable }}
- --enable-metrics=true
{{- end }}
- --leader-elect={{ .Values.custom.leader_elect_enable }}
{{- if .Values.custom.leader_elect_enable }}
- --leader-elect-resource-namespace={{ .Release.Namespace }}
Expand All @@ -166,4 +169,28 @@ spec:
securityContext:
{{- toYaml .Values.custom.controller_default_csc | nindent 14 }}
{{- end }}
{{- end }}
---
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8081"
prometheus.io/scrape: "true"
name: {{ .Release.Name }}-controllers-service
namespace: {{ .Release.Namespace }}
labels:
app: volcano-controller
{{- if .Values.custom.common_labels }}
{{- toYaml .Values.custom.common_labels | nindent 4 }}
{{- end }}
spec:
ports:
- port: 8081
protocol: TCP
targetPort: 8081
name: "metrics"
selector:
app: volcano-controller
type: ClusterIP
{{- end }}
2 changes: 2 additions & 0 deletions installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ spec:
- --logtostderr
- --scheduler-conf=/volcano.scheduler/{{base .Values.basic.scheduler_config_file}}
- --enable-healthz=true
{{- if .Values.custom.scheduler_metrics_enable }}
- --enable-metrics=true
{{- end }}
- --leader-elect={{ .Values.custom.leader_elect_enable }}
{{- if .Values.custom.leader_elect_enable }}
- --leader-elect-resource-namespace={{ .Release.Namespace }}
Expand Down
2 changes: 2 additions & 0 deletions installer/helm/chart/volcano/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ custom:
admission_replicas: 1
controller_enable: true
controller_replicas: 1
controller_metrics_enable: true
scheduler_enable: true
scheduler_replicas: 1
scheduler_metrics_enable: true
leader_elect_enable: false
enabled_admissions: "/jobs/mutate,/jobs/validate,/podgroups/mutate,/pods/validate,/pods/mutate,/queues/mutate,/queues/validate"

Expand Down
23 changes: 23 additions & 0 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4331,6 +4331,28 @@ roleRef:
apiGroup: rbac.authorization.k8s.io
---
# Source: volcano/templates/controllers.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8081"
prometheus.io/scrape: "true"
name: volcano-controllers-service
namespace: volcano-system
labels:
app: volcano-controller
spec:
ports:
- port: 8081
protocol: TCP
targetPort: 8081
name: "metrics"
selector:
app: volcano-controller
type: ClusterIP
---
# Source: volcano/templates/controllers.yaml
kind: Deployment
apiVersion: apps/v1
metadata:
Expand All @@ -4356,6 +4378,7 @@ spec:
args:
- --logtostderr
- --enable-healthz=true
- --enable-metrics=true
- --leader-elect=false
- -v=4
- 2>&1
Expand Down
3 changes: 0 additions & 3 deletions installer/volcano-monitoring-latest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,6 @@ spec:
timeoutSeconds: 5
dnsPolicy: ClusterFirst

nodeSelector:
node.kubernetes.io/instance-type: controlpanel

serviceAccountName: kube-state-metrics
---
# Source: volcano/templates/grafana.yaml
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/job/state/completing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package state

import (
"fmt"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
Expand All @@ -33,6 +35,7 @@ func (ps *completingState) Execute(action v1alpha1.Action) error {
return false
}
status.State.Phase = vcbatch.Completed
UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
})
}
39 changes: 39 additions & 0 deletions pkg/controllers/job/state/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package state

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"volcano.sh/volcano/pkg/scheduler/metrics"
)

var (
jobCompletedPhaseCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "job_completed_phase_count",
Help: "Number of job completed phase",
}, []string{"job_name", "queue_name"},
)

jobFailedPhaseCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "job_failed_phase_count",
Help: "Number of job failed phase",
}, []string{"job_name", "queue_name"},
)
)

func UpdateJobCompleted(jobName, queueName string) {
jobCompletedPhaseCount.WithLabelValues(jobName, queueName).Inc()
}

func UpdateJobFailed(jobName, queueName string) {
jobFailedPhaseCount.WithLabelValues(jobName, queueName).Inc()
}

func DeleteJobMetrics(jobName, queueName string) {
jobCompletedPhaseCount.DeleteLabelValues(jobName, queueName)
jobFailedPhaseCount.DeleteLabelValues(jobName, queueName)
}
3 changes: 3 additions & 0 deletions pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package state

import (
"fmt"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
Expand All @@ -34,6 +36,7 @@ func (ps *restartingState) Execute(action v1alpha1.Action) error {
if status.RetryCount >= maxRetry {
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
}
total := int32(0)
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package state

import (
"fmt"

v1 "k8s.io/api/core/v1"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand Down Expand Up @@ -62,6 +64,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {
minSuccess := ps.job.Job.Spec.MinSuccess
if minSuccess != nil && status.Succeeded >= *minSuccess {
status.State.Phase = vcbatch.Completed
UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
}

Expand All @@ -76,6 +79,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {
if taskStatus, ok := status.TaskStatusCount[task.Name]; ok {
if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable {
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
return true
}
}
Expand All @@ -84,10 +88,13 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {

if minSuccess != nil && status.Succeeded < *minSuccess {
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
} else if status.Succeeded >= ps.job.Job.Spec.MinAvailable {
status.State.Phase = vcbatch.Completed
UpdateJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
} else {
status.State.Phase = vcbatch.Failed
UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue)
}
return true
}
Expand Down
1 change: 0 additions & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
vcinformer "volcano.sh/apis/pkg/client/informers/externalversions"
cpuinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/nodeinfo/v1alpha1"
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package util

import (
"net/http"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/component-base/config"
"k8s.io/component-base/metrics/legacyregistry"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
Expand Down Expand Up @@ -47,3 +53,10 @@ func LeaderElectionDefault(l *config.LeaderElectionConfiguration) {
l.ResourceLock = resourcelock.LeasesResourceLock
l.ResourceNamespace = defaultLockObjectNamespace
}

func PromHandler() http.Handler {
// Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them.
prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{}))
}

0 comments on commit 8a1a43e

Please sign in to comment.