Skip to content

Commit

Permalink
Merge pull request #254 from lminzhw/fix
Browse files Browse the repository at this point in the history
fix some bug about event and workqueue
  • Loading branch information
volcano-sh-bot authored Jul 3, 2019
2 parents cebe8bb + 2f9e119 commit 7cdd739
Show file tree
Hide file tree
Showing 18 changed files with 273 additions and 172 deletions.
4 changes: 2 additions & 2 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
taskNames := map[string]string{}
var totalReplicas int32

if job.Spec.MinAvailable < 0 {
if job.Spec.MinAvailable <= 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("'minAvailable' cannot be less than zero.")
return fmt.Sprintf("'minAvailable' must be greater than zero.")
}

if job.Spec.MaxRetry < 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/admission/admit_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestValidateExecution(t *testing.T) {
},
},
reviewResponse: v1beta1.AdmissionResponse{Allowed: false},
ret: "'minAvailable' cannot be less than zero.",
ret: "'minAvailable' must be greater than zero.",
ExpectErr: true,
},
// maxretry less than zero
Expand Down
12 changes: 11 additions & 1 deletion pkg/controllers/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ package cache
import (
"fmt"
"sync"
"time"

"github.com/golang/glog"

"golang.org/x/time/rate"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -71,9 +75,15 @@ func jobKeyOfPod(pod *v1.Pod) (string, error) {

//New gets the job Cache
func New() Cache {
queue := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 180*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)

return &jobCache{
jobs: map[string]*apis.JobInfo{},
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(queue),
}
}

Expand Down
25 changes: 17 additions & 8 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8scontroller "k8s.io/kubernetes/pkg/controller"

kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
Expand Down Expand Up @@ -90,6 +91,8 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM

if len(errs) != 0 {
glog.Errorf("failed to kill pods for job %s/%s, with err %+v", job.Namespace, job.Name, errs)
cc.recorder.Event(job, v1.EventTypeWarning, k8scontroller.FailedDeletePodReason,
fmt.Sprintf("Error deleting pods: %+v", errs))
return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)
}

Expand Down Expand Up @@ -154,10 +157,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.Update
job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if job, err := cc.initJobStatus(job); err != nil {
if update, job, err := cc.initJobStatus(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.JobStatusError),
fmt.Sprintf("Failed to initialize job status, err: %v", err))
return err
} else if update {
return nil
}

if err := cc.pluginOnJobAdd(job); err != nil {
Expand Down Expand Up @@ -276,7 +281,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
// So gang-scheduling could schedule the Job successfully
glog.Errorf("Failed to create pod %s for Job %s, err %#v",
pod.Name, job.Name, err)
creationErrs = append(creationErrs, err)
creationErrs = append(creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))
} else {
if err != nil && apierrors.IsAlreadyExists(err) {
cc.resyncTask(pod)
Expand All @@ -292,6 +297,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
waitCreationGroup.Wait()

if len(creationErrs) != 0 {
cc.recorder.Event(job, v1.EventTypeWarning, k8scontroller.FailedCreatePodReason,
fmt.Sprintf("Error creating pods: %+v", creationErrs))
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}

Expand Down Expand Up @@ -321,6 +328,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
waitDeletionGroup.Wait()

if len(deletionErrs) != 0 {
cc.recorder.Event(job, v1.EventTypeWarning, k8scontroller.FailedDeletePodReason,
fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}

Expand Down Expand Up @@ -490,7 +499,7 @@ func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) error {
glog.Errorf("Failed to delete pod %s/%s for Job %s, err %#v",
pod.Namespace, pod.Name, jobName, err)

return err
return fmt.Errorf("failed to delete pod %s, err %#v", pod.Name, err)
}

return nil
Expand Down Expand Up @@ -530,9 +539,9 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {
return &minAvailableTasksRes
}

func (cc *Controller) initJobStatus(job *vkv1.Job) (*vkv1.Job, error) {
func (cc *Controller) initJobStatus(job *vkv1.Job) (bool, *vkv1.Job, error) {
if job.Status.State.Phase != "" {
return job, nil
return false, job, nil
}

job.Status.State.Phase = vkv1.Pending
Expand All @@ -541,13 +550,13 @@ func (cc *Controller) initJobStatus(job *vkv1.Job) (*vkv1.Job, error) {
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return nil, err
return false, nil, err
}
if err := cc.cache.Update(newJob); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, err)
return nil, err
return false, nil, err
}

return newJob, nil
return true, newJob, nil
}
5 changes: 5 additions & 0 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ func TestCreateJobFunc(t *testing.T) {
Name: "job1",
Namespace: namespace,
},
Status: v1alpha1.JobStatus{
State: v1alpha1.JobState{
Phase: v1alpha1.Pending,
},
},
},
PodGroup: &kbv1aplha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ func (cc *Controller) addPod(obj interface{}) {
return
}

if pod.DeletionTimestamp != nil {
cc.deletePod(pod)
return
}

req := apis.Request{
Namespace: pod.Namespace,
JobName: jobName,
Expand Down Expand Up @@ -209,6 +214,15 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
return
}

if newPod.ResourceVersion == oldPod.ResourceVersion {
return
}

if newPod.DeletionTimestamp != nil {
cc.deletePod(newObj)
return
}

if err := cc.cache.UpdatePod(newPod); err != nil {
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
newPod.Namespace, newPod.Name, err)
Expand Down
10 changes: 6 additions & 4 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
kubeclientset "k8s.io/client-go/kubernetes"
vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
Expand Down Expand Up @@ -67,10 +68,11 @@ func newController() *Controller {
func buildPod(namespace, name string, p v1.PodPhase, labels map[string]string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)),
Name: name,
Namespace: namespace,
Labels: labels,
UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)),
Name: name,
Namespace: namespace,
Labels: labels,
ResourceVersion: string(uuid.NewUUID()),
},
Status: v1.PodStatus{
Phase: p,
Expand Down
Loading

0 comments on commit 7cdd739

Please sign in to comment.