Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: frequently create and delete vcjobs with the same name, and the … #3771

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/controllers/apis/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package apis
import (
"fmt"

"k8s.io/apimachinery/pkg/types"

"volcano.sh/apis/pkg/apis/bus/v1alpha1"
flowv1alpha1 "volcano.sh/apis/pkg/apis/flow/v1alpha1"
)
Expand All @@ -27,6 +29,7 @@ import (
type Request struct {
Namespace string
JobName string
JobUid types.UID
TaskName string
QueueName string

Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -140,9 +141,13 @@ func (cc *jobcontroller) addPod(obj interface{}) {
klog.Errorf("Failed to convert %v to v1.Pod", obj)
return
}

var jobUid types.UID
// Filter out pods that are not created from volcano job
if !isControlledBy(pod, helpers.JobKind) {
return
} else {
jobUid = metav1.GetControllerOf(pod).UID
}

jobName, found := pod.Annotations[batch.JobNameKey]
Expand Down Expand Up @@ -174,6 +179,7 @@ func (cc *jobcontroller) addPod(obj interface{}) {
req := apis.Request{
Namespace: pod.Namespace,
JobName: jobName,
JobUid: jobUid,

Event: bus.OutOfSyncEvent,
JobVersion: int32(dVersion),
Expand Down Expand Up @@ -201,9 +207,12 @@ func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
return
}

var jobUid types.UID
// Filter out pods that are not created from volcano job
if !isControlledBy(newPod, helpers.JobKind) {
return
} else {
jobUid = metav1.GetControllerOf(newPod).UID
}

if newPod.ResourceVersion == oldPod.ResourceVersion {
Expand Down Expand Up @@ -275,6 +284,7 @@ func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) {
req := apis.Request{
Namespace: newPod.Namespace,
JobName: jobName,
JobUid: jobUid,
TaskName: taskName,

Event: event,
Expand Down Expand Up @@ -303,9 +313,12 @@ func (cc *jobcontroller) deletePod(obj interface{}) {
}
}

var jobUid types.UID
// Filter out pods that are not created from volcano job
if !isControlledBy(pod, helpers.JobKind) {
return
} else {
jobUid = metav1.GetControllerOf(pod).UID
}

taskName, found := pod.Annotations[batch.TaskSpecKey]
Expand Down Expand Up @@ -339,6 +352,7 @@ func (cc *jobcontroller) deletePod(obj interface{}) {
req := apis.Request{
Namespace: pod.Namespace,
JobName: jobName,
JobUid: jobUid,
TaskName: taskName,

Event: bus.PodEvictedEvent,
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action {
return v1alpha1.SyncJobAction
}

// Solve the scenario: When pod events accumulate and vcjobs with the same name are frequently created,
// it is easy for the pod to cause abnormal status of the newly created vcjob with the same name.
if len(req.JobUid) != 0 && job != nil && req.JobUid != job.UID {
klog.V(2).Infof("The req belongs to job(%s/%s) and job uid is %v, but the uid of job(%s/%s) is %v in cache, perform %v action",
req.Namespace, req.JobName, req.JobUid, job.Namespace, job.Name, job.UID, v1alpha1.SyncJobAction)
return v1alpha1.SyncJobAction
}

// For all the requests triggered from discarded job resources will perform sync action instead
if req.JobVersion < job.Status.Version {
klog.Infof("Request %s is outdated, will perform sync instead.", req)
Expand Down
44 changes: 44 additions & 0 deletions pkg/controllers/job/job_controller_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,50 @@ func TestApplyPolicies(t *testing.T) {
},
ReturnVal: busv1alpha1.SyncJobAction,
},
{
Name: "Test Apply policies where job uid is inconsistent, ignore the existing policy action in the job and execute syncjob",
Job: &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
UID: "job1-uid-10001",
},
Spec: v1alpha1.JobSpec{
SchedulerName: "volcano",
Tasks: []v1alpha1.TaskSpec{
{
Name: "task1",
Replicas: 6,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: "pods",
Namespace: namespace,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "Containers",
},
},
},
},
Policies: []v1alpha1.LifecyclePolicy{
{
Action: busv1alpha1.TerminateJobAction,
Event: busv1alpha1.PodEvictedEvent,
ExitCode: &errorCode0,
},
},
},
},
},
},
Request: &apis.Request{
JobUid: "job1-uid-10000",
Event: busv1alpha1.PodEvictedEvent,
},
ReturnVal: busv1alpha1.SyncJobAction,
},
{
Name: "Test Apply policies where version is outdated",
Job: &v1alpha1.Job{
Expand Down
Loading