Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Replaced FIFO by workqueue. #584

Merged
merged 1 commit into from
Feb 11, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 47 additions & 94 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
Expand All @@ -35,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"

Expand Down Expand Up @@ -79,8 +81,8 @@ type SchedulerCache struct {
Nodes map[string]*kbapi.NodeInfo
Queues map[kbapi.QueueID]*kbapi.QueueInfo

errTasks *cache.FIFO
deletedJobs *cache.FIFO
errTasks workqueue.RateLimitingInterface
deletedJobs workqueue.RateLimitingInterface

namespaceAsQueue bool
}
Expand Down Expand Up @@ -164,41 +166,13 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {
return dvb.volumeBinder.Binder.BindPodVolumes(task.Pod)
}

func taskKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}

task, ok := obj.(*kbapi.TaskInfo)

if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}

return string(task.UID), nil
}

func jobKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}

job, ok := obj.(*kbapi.JobInfo)

if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}

return string(job.UID), nil
}

func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool) *SchedulerCache {
sc := &SchedulerCache{
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Nodes: make(map[string]*kbapi.NodeInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
errTasks: cache.NewFIFO(taskKey),
deletedJobs: cache.NewFIFO(jobKey),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
namespaceAsQueue: nsAsQueue,
Expand Down Expand Up @@ -324,10 +298,10 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
}

// Re-sync error tasks.
go sc.resync()
go wait.Until(sc.processResyncTask, 0, stopCh)

// Cleanup jobs.
go sc.cleanupJobs()
go wait.Until(sc.processCleanupJob, 0, stopCh)
}

func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
Expand Down Expand Up @@ -388,7 +362,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
}

// Add new task to node.
node.UpdateTask(task)
if err := node.UpdateTask(task); err != nil {
return err
}

p := task.Pod

Expand Down Expand Up @@ -430,7 +406,9 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error
task.NodeName = hostname

// Add task to the node.
node.AddTask(task)
if err := node.AddTask(task); err != nil {
return err
}

p := task.Pod

Expand Down Expand Up @@ -476,77 +454,52 @@ func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string)
func (sc *SchedulerCache) deleteJob(job *kbapi.JobInfo) {
glog.V(3).Infof("Try to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name)

time.AfterFunc(5*time.Second, func() {
sc.deletedJobs.AddIfNotPresent(job)
})
sc.deletedJobs.AddRateLimited(job)
}

func (sc *SchedulerCache) processCleanupJob() error {
_, err := sc.deletedJobs.Pop(func(obj interface{}) error {
job, ok := obj.(*kbapi.JobInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}

func() {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

if kbapi.JobTerminated(job) {
delete(sc.Jobs, job.UID)
glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
}
}()
func (sc *SchedulerCache) processCleanupJob() {
obj, shutdown := sc.deletedJobs.Get()
if shutdown {
return
}

return nil
})
job, found := obj.(*kbapi.JobInfo)
if !found {
glog.Errorf("Failed to convert <%v> to *JobInfo", obj)
return
}

return err
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

func (sc *SchedulerCache) cleanupJobs() {
for {
err := sc.processCleanupJob()
if err != nil {
glog.Errorf("Failed to process job clean up: %v", err)
}
if kbapi.JobTerminated(job) {
delete(sc.Jobs, job.UID)
glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
}
}

func (sc *SchedulerCache) resyncTask(task *kbapi.TaskInfo) {
if err := sc.errTasks.AddIfNotPresent(task); err != nil {
glog.Errorf("Failed to re-sync tasks <%v/%v>: %v",
task.Namespace, task.Name, err)
}
sc.errTasks.AddRateLimited(task)
}

func (sc *SchedulerCache) resync() {
for {
err := sc.processResyncTask()
if err != nil {
glog.Errorf("Failed to process resync: %v", err)
}
func (sc *SchedulerCache) processResyncTask() {
obj, shutdown := sc.errTasks.Get()
if shutdown {
return
}
task, ok := obj.(*kbapi.TaskInfo)
if !ok {
glog.Errorf("failed to convert %v to *v1.Pod", obj)
return
}
}

func (sc *SchedulerCache) processResyncTask() error {
_, err := sc.errTasks.Pop(func(obj interface{}) error {
task, ok := obj.(*kbapi.TaskInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}

if err := sc.syncTask(task); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>", task.Namespace, task.Name)
return err
}
return nil
})

return err
if err := sc.syncTask(task); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>, retry it.", task.Namespace, task.Name)
sc.resyncTask(task)
}
}

func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
Expand Down