Skip to content

Commit

Permalink
Added Job error handling.
Browse files Browse the repository at this point in the history
Signed-off-by: Klaus Ma <mada3@huawei.com>
  • Loading branch information
Klaus Ma authored and k82cn committed Jan 19, 2019
1 parent 59b0a5c commit 26b05f5
Show file tree
Hide file tree
Showing 21 changed files with 792 additions and 401 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cli:
generate-code:
go build -o ${BIN_DIR}/deepcopy-gen ./cmd/deepcopy-gen/
${BIN_DIR}/deepcopy-gen -i ./pkg/apis/batch/v1alpha1/ -O zz_generated.deepcopy
${BIN_DIR}/deepcopy-gen -i ./pkg/apis/core/v1alpha1/ -O zz_generated.deepcopy
${BIN_DIR}/deepcopy-gen -i ./pkg/apis/bus/v1alpha1/ -O zz_generated.deepcopy

e2e-test:
./hack/run-e2e.sh
Expand Down
36 changes: 2 additions & 34 deletions config/crds/batch_v1alpha1_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,13 @@ spec:
input:
description: The volume mount for input of Job
properties:
claim:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
mountPropagation:
description: mountPropagation determines how mounts are propagated
from the host to container and the other way around. When not
set, MountPropagationNone is used. This field is beta in 1.10.
type: string
name:
description: This must match the Name of a Volume.
type: string
readOnly:
description: Mounted read-only if true, read-write otherwise (false
or unspecified). Defaults to false.
type: boolean
subPath:
description: Path within the volume from which the container's volume
should be mounted. Defaults to "" (volume's root).
type: string
required:
- mountPath
type: object
Expand All @@ -63,29 +47,13 @@ spec:
output:
description: The volume mount for output of Job
properties:
claim:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
mountPropagation:
description: mountPropagation determines how mounts are propagated
from the host to container and the other way around. When not
set, MountPropagationNone is used. This field is beta in 1.10.
type: string
name:
description: This must match the Name of a Volume.
type: string
readOnly:
description: Mounted read-only if true, read-write otherwise (false
or unspecified). Defaults to false.
type: boolean
subPath:
description: Path within the volume from which the container's volume
should be mounted. Defaults to "" (volume's root).
type: string
required:
- mountPath
type: object
Expand Down
5 changes: 4 additions & 1 deletion example/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ metadata:
name: test-job
spec:
minAvailable: 3
policies:
- event: PodEvicted
action: RestartJob
input:
mountPath: "/myinput"
output:
mountPath: "/myoutput"
claim:
volumeClaim:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "my-storage-class"
resources:
Expand Down
28 changes: 17 additions & 11 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,20 @@ type JobSpec struct {

// VolumeSpec defines the specification of Volume, e.g. PVC
type VolumeSpec struct {
v1.VolumeMount `json:",inline"`
// Path within the container at which the volume should be mounted. Must
// not contain ':'.
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`

// VolumeClaim defines the PVC used by the VolumeMount.
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"claim,omitempty" protobuf:"bytes,1,opt,name=claim"`
VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"`
}

// Event represent the phase of Job, e.g. pod-failed.
type Event string

const (
// AllEvent means all event
AllEvents Event = "*"
AnyEvent Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
Expand Down Expand Up @@ -165,8 +167,8 @@ const (
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Terminating JobPhase = "Terminating"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
// Terminated is the phase that the job is finished unexpected, e.g. events
Terminated JobPhase = "Terminated"
)

// JobState contains details for the current state of the job.
Expand All @@ -189,25 +191,29 @@ type JobStatus struct {
// Current state of Job.
State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`

// The minimal available pods to run for this Job
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"`

// The number of pending pods.
// +optional
Pending int32 `json:"pending,omitempty" protobuf:"bytes,2,opt,name=pending"`
Pending int32 `json:"pending,omitempty" protobuf:"bytes,3,opt,name=pending"`

// The number of running pods.
// +optional
Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"`
Running int32 `json:"running,omitempty" protobuf:"bytes,4,opt,name=running"`

// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,4,opt,name=succeeded"`
Succeeded int32 `json:"Succeeded,omitempty" protobuf:"bytes,5,opt,name=succeeded"`

// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed,omitempty" protobuf:"bytes,5,opt,name=failed"`
Failed int32 `json:"failed,omitempty" protobuf:"bytes,6,opt,name=failed"`

// The minimal available pods to run for this Job
// The number of pods which reached phase Terminating.
// +optional
MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,6,opt,name=minAvailable"`
Terminating int32 `json:"terminating,omitempty" protobuf:"bytes,7,opt,name=terminating"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
3 changes: 1 addition & 2 deletions pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/bus/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

100 changes: 46 additions & 54 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,10 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
kbinfoext "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions"
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1"

vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1"
v1corev1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1"
"hpw.cloud/volcano/pkg/apis/helpers"
Expand All @@ -47,6 +35,15 @@ import (
vkbatchlister "hpw.cloud/volcano/pkg/client/listers/batch/v1alpha1"
vkcorelister "hpw.cloud/volcano/pkg/client/listers/bus/v1alpha1"
"hpw.cloud/volcano/pkg/controllers/job/state"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

// Controller the Job Controller type
Expand Down Expand Up @@ -173,15 +170,8 @@ func NewJobController(config *rest.Config) *Controller {
cc.pgSynced = cc.pgInformer.Informer().HasSynced

// Register actions
actionFns := map[vkbatchv1.Action]state.ActionFn{
vkbatchv1.ResumeJobAction: cc.resumeJob,
vkbatchv1.SyncJobAction: cc.syncJob,
vkbatchv1.AbortJobAction: cc.abortJob,
vkbatchv1.TerminateJobAction: cc.terminateJob,
vkbatchv1.RestartJobAction: cc.restartJob,
vkbatchv1.RestartTaskAction: cc.syncJob,
}
state.RegisterActions(actionFns)
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob

return cc
}
Expand All @@ -203,52 +193,54 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
glog.Infof("JobController is running ...... ")
}

type Request struct {
Namespace string
JobName string
PodName string

Event vkbatchv1.Event
Action vkbatchv1.Action

Reason string
Message string
}

func (cc *Controller) worker() {
obj := cache.Pop(cc.eventQueue)
if obj == nil {
glog.Errorf("Fail to pop item from eventQueue")
return
}

req := obj.(*state.Request)
req := obj.(*Request)

if req.Target != nil {
if job, err := cc.jobLister.Jobs(req.Namespace).Get(req.Target.Name); err != nil {
req.Job = job
}
job, err := cc.jobLister.Jobs(req.Namespace).Get(req.JobName)
if err != nil {
return
}

if req.Job == nil {
if req.Pod == nil {
glog.Errorf("Empty data for request %v", req)
return
}
jobs, err := cc.jobLister.List(labels.Everything())
if err != nil {
glog.Errorf("Failed to list Jobs for Pod %v/%v", req.Pod.Namespace, req.Pod.Name)
}

// TODO(k82cn): select by UID instead of loop
ctl := helpers.GetController(req.Pod)
for _, j := range jobs {
if j.UID == ctl {
req.Job = j
break
}
}
st := state.NewState(job)
if st == nil {
glog.Errorf("Invalid state <%s> of Job <%v/%v>",
job.Status.State, job.Namespace, job.Name)
return
}

if req.Job == nil {
glog.Errorf("No Job for request %v from pod %v/%v",
req.Event, req.Pod.Namespace, req.Pod.Name)
return
action := req.Action
if len(action) == 0 {
pod, err := cc.podLister.Pods(req.Namespace).Get(req.PodName)
if err != nil {
pod = nil
}
action = applyPolicies(req.Event, job, pod)
}

st := state.NewState(req)
if err := st.Execute(); err != nil {
glog.Errorf("Failed to handle Job %s/%s: %v",
req.Job.Namespace, req.Job.Name, err)
if err := st.Execute(action, req.Reason, req.Message); err != nil {
glog.Errorf("Failed to handle Job <%s/%s>: %v",
job.Namespace, job.Name, err)
// If any error, requeue it.
// TODO(k82cn): replace with RateLimteQueue
cc.eventQueue.Add(req)
if e := cc.eventQueue.Add(req); e != nil {
glog.Errorf("Failed to reqeueue request <%v>", req)
}
}
}
Loading

0 comments on commit 26b05f5

Please sign in to comment.