Skip to content

Commit

Permalink
feat: add volcano jobs phase metric
Browse files Browse the repository at this point in the history
Signed-off-by: Prepmachine4 <prepmachine4@gmail.com>
  • Loading branch information
Prepmachine4 committed Aug 5, 2024
1 parent 0fa8102 commit c1ccb0f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 1 deletion.
5 changes: 5 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
defaultMaxRequeueNum = 15
defaultSchedulerName = "volcano"
defaultHealthzAddress = ":11251"
defaultListenAddress = ":8080"
defaultLockObjectNamespace = "volcano-system"
defaultPodGroupWorkers = 5
defaultGCWorkers = 1
Expand Down Expand Up @@ -69,6 +70,8 @@ type ServerOption struct {
// defaulting to 0.0.0.0:11251
HealthzBindAddress string
EnableHealthz bool
EnableMetrics bool
ListenAddress string
// To determine whether inherit owner's annotations for pods when create podgroup
InheritOwnerAnnotations bool
// WorkerThreadsForPG is the number of threads syncing podgroup operations
Expand Down Expand Up @@ -113,6 +116,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) {
fs.IntVar(&s.MaxRequeueNum, "max-requeue-num", defaultMaxRequeueNum, "The number of times a job, queue or command will be requeued before it is dropped out of the queue")
fs.StringVar(&s.HealthzBindAddress, "healthz-address", defaultHealthzAddress, "The address to listen on for the health check server.")
fs.BoolVar(&s.EnableHealthz, "enable-healthz", false, "Enable the health check; it is false by default")
fs.BoolVar(&s.EnableMetrics, "enable-metrics", false, "Enable the metrics function; it is false by default")
fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.")
fs.BoolVar(&s.InheritOwnerAnnotations, "inherit-owner-annotations", true, "Enable inherit owner annotations for pods when create podgroup; it is enabled by default")
fs.Uint32Var(&s.WorkerThreadsForPG, "worker-threads-for-podgroup", defaultPodGroupWorkers, "The number of threads syncing podgroup operations. The larger the number, the faster the podgroup processing, but requires more CPU load.")
fs.Uint32Var(&s.WorkerThreadsForGC, "worker-threads-for-gc", defaultGCWorkers, "The number of threads for recycling jobs. The larger the number, the faster the job recycling, but requires more CPU load.")
Expand Down
20 changes: 19 additions & 1 deletion cmd/controller-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ package app
import (
"context"
"fmt"
"net/http"
"os"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
Expand All @@ -31,6 +35,7 @@ import (
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"

"volcano.sh/apis/pkg/apis/helpers"
Expand All @@ -48,13 +53,19 @@ func Run(opt *options.ServerOption) error {
if err != nil {
return err
}

if opt.EnableHealthz {
if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller", opt.CaCertData, opt.CertData, opt.KeyData); err != nil {
return err
}
}

if opt.EnableMetrics {
go func() {
http.Handle("/metrics", promHandler())
klog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()
}

run := startControllers(config, opt)

ctx := signals.SetupSignalContext()
Expand Down Expand Up @@ -177,3 +188,10 @@ func isControllerEnabled(name string, controllers []string) bool {
// if we get here, there was no explicit inclusion or exclusion
return hasStar
}

func promHandler() http.Handler {
// Unregister go and process related collector because it's duplicated and `legacyregistry.DefaultGatherer` also has registered them.
prometheus.DefaultRegisterer.Unregister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(prometheus.Gatherers{prometheus.DefaultGatherer, legacyregistry.DefaultGatherer}, promhttp.HandlerOpts{}))
}
26 changes: 26 additions & 0 deletions installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ spec:
args:
- --logtostderr
- --enable-healthz=true
- --enable-metrics=true
- --leader-elect={{ .Values.custom.leader_elect_enable }}
{{- if .Values.custom.leader_elect_enable }}
- --leader-elect-resource-namespace={{ .Release.Namespace }}
Expand All @@ -163,3 +164,28 @@ spec:
- 2>&1
imagePullPolicy: {{ .Values.basic.image_pull_policy }}
{{- end }}
---
apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /metrics
prometheus.io/port: "8080"
prometheus.io/scrape: "true"
name: {{ .Release.Name }}-controllers-service
namespace: {{ .Release.Namespace }}
labels:
app: volcano-controller
{{- if .Values.custom.common_labels }}
{{- toYaml .Values.custom.common_labels | nindent 4 }}
{{- end }}
spec:
ports:
- port: 8080
protocol: TCP
targetPort: 8080
name: "metrics"
selector:
app: volcano-controller
type: ClusterIP
{{- end }}
4 changes: 4 additions & 0 deletions pkg/controllers/job/state/completing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package state

import (
"fmt"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/scheduler/metrics"
)

type completingState struct {
Expand All @@ -33,6 +36,7 @@ func (ps *completingState) Execute(action v1alpha1.Action) error {
return false
}
status.State.Phase = vcbatch.Completed
metrics.RegisterJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name))
return true
})
}
4 changes: 4 additions & 0 deletions pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package state

import (
"fmt"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/scheduler/metrics"
)

type restartingState struct {
Expand All @@ -34,6 +37,7 @@ func (ps *restartingState) Execute(action v1alpha1.Action) error {
if status.RetryCount >= maxRetry {
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
status.State.Phase = vcbatch.Failed
metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name))
return true
}
total := int32(0)
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package state

import (
"fmt"

v1 "k8s.io/api/core/v1"

vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/scheduler/metrics"
)

type runningState struct {
Expand Down Expand Up @@ -62,6 +65,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {
minSuccess := ps.job.Job.Spec.MinSuccess
if minSuccess != nil && status.Succeeded >= *minSuccess {
status.State.Phase = vcbatch.Completed
metrics.RegisterJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name))
return true
}

Expand All @@ -76,6 +80,7 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {
if taskStatus, ok := status.TaskStatusCount[task.Name]; ok {
if taskStatus.Phase[v1.PodSucceeded] < *task.MinAvailable {
status.State.Phase = vcbatch.Failed
metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name))
return true
}
}
Expand All @@ -84,10 +89,13 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {

if minSuccess != nil && status.Succeeded < *minSuccess {
status.State.Phase = vcbatch.Failed
metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name))
} else if status.Succeeded >= ps.job.Job.Spec.MinAvailable {
status.State.Phase = vcbatch.Completed
metrics.RegisterJobCompleted(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name))
} else {
status.State.Phase = vcbatch.Failed
metrics.RegisterJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name))
}
return true
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/scheduler/metrics/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,34 @@ var (
Help: "Number of retry counts for one job",
}, []string{"job_id"},
)

jobCompletedPhaseCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: VolcanoNamespace,
Name: "job_completed_phase_count",
Help: "Number of jobs in completed phase",
}, []string{"job_name"},
)

jobFailedPhaseCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: VolcanoNamespace,
Name: "job_failed_phase_count",
Help: "Number of jobs in failed phase",
}, []string{"job_name"},
)
)

// RegisterJobCompleted records the number of jobs in completed phase
func RegisterJobCompleted(jobName string) {
jobCompletedPhaseCount.WithLabelValues(jobName).Inc()
}

// RegisterJobFailed records the number of jobs in failed phase
func RegisterJobFailed(jobName string) {
jobFailedPhaseCount.WithLabelValues(jobName).Inc()
}

// UpdateJobShare records share for one job
func UpdateJobShare(jobNs, jobID string, share float64) {
jobShare.WithLabelValues(jobNs, jobID).Set(share)
Expand All @@ -57,4 +83,6 @@ func DeleteJobMetrics(jobName, queue, namespace string) {
unscheduleTaskCount.DeleteLabelValues(jobName)
jobShare.DeleteLabelValues(namespace, jobName)
jobRetryCount.DeleteLabelValues(jobName)
jobCompletedPhaseCount.DeleteLabelValues(namespace, jobName)
jobFailedPhaseCount.DeleteLabelValues(namespace, jobName)
}

0 comments on commit c1ccb0f

Please sign in to comment.