diff --git a/pkg/controller/broadcastjob/broadcastjob_controller.go b/pkg/controller/broadcastjob/broadcastjob_controller.go index e919f800c2..360205b621 100644 --- a/pkg/controller/broadcastjob/broadcastjob_controller.go +++ b/pkg/controller/broadcastjob/broadcastjob_controller.go @@ -245,7 +245,15 @@ func (r *ReconcileBroadcastJob) Reconcile(_ context.Context, request reconcile.R failed := int32(len(failedPods)) succeeded := int32(len(succeededPods)) - desiredNodes, restNodesToRunPod, podsToDelete := getNodesToRunPod(nodes, job, existingNodeToPodMap) + isDeadLine := func(job *appsv1alpha1.BroadcastJob) bool { + if job.Spec.CompletionPolicy.Type == appsv1alpha1.Always && + job.Spec.CompletionPolicy.ActiveDeadlineSeconds != nil { + return time.Since(job.CreationTimestamp.Time) >= time.Duration(*job.Spec.CompletionPolicy.ActiveDeadlineSeconds)*time.Second + } + return false + } + + desiredNodes, restNodesToRunPod, podsToDelete := getNodesToRunPod(nodes, job, existingNodeToPodMap, isDeadLine) desired := int32(len(desiredNodes)) klog.Infof("%s/%s has %d/%d nodes remaining to schedule pods", job.Namespace, job.Name, len(restNodesToRunPod), desired) klog.Infof("Before broadcastjob reconcile %s/%s, desired=%d, active=%d, failed=%d", job.Namespace, job.Name, desired, active, failed) @@ -514,7 +522,9 @@ func isJobFailed(job *appsv1alpha1.BroadcastJob, pods []*corev1.Pod) (bool, stri // * restNodesToRunPod: the nodes do not have pods running yet, excluding the nodes not satisfying constraints such as affinity, taints // * podsToDelete: the pods that do not satisfy the node constraint any more func getNodesToRunPod(nodes *corev1.NodeList, job *appsv1alpha1.BroadcastJob, - existingNodeToPodMap map[string]*corev1.Pod) (map[string]*corev1.Pod, []*corev1.Node, []*corev1.Pod) { + existingNodeToPodMap map[string]*corev1.Pod, + isDeadline func(job *appsv1alpha1.BroadcastJob) bool, +) (map[string]*corev1.Pod, []*corev1.Node, []*corev1.Pod) { var podsToDelete []*corev1.Pod var restNodesToRunPod []*corev1.Node @@ -530,6 +540,10 @@ func getNodesToRunPod(nodes *corev1.NodeList, job *appsv1alpha1.BroadcastJob, klog.Infof("Pod %s does not fit on node %s due to %v", pod.Name, node.Name, err) podsToDelete = append(podsToDelete, pod) continue + } else if isDeadline(job) { + klog.Infof("Pod %s need to delete due to deadline", pod.Name) + podsToDelete = append(podsToDelete, pod) + continue } desiredNodes[node.Name] = pod } else {