Skip to content

Commit

Permalink
add migration object limiter for namespace
Browse files Browse the repository at this point in the history
Signed-off-by: songtao98 <songtao2603060@gmail.com>
  • Loading branch information
songtao98 committed May 27, 2024
1 parent f436876 commit 37e3272
Show file tree
Hide file tree
Showing 3 changed files with 327 additions and 41 deletions.
3 changes: 2 additions & 1 deletion pkg/descheduler/apis/config/types_pluginargs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ type MigrationControllerArgs struct {
type MigrationLimitObjectType string

const (
MigrationLimitObjectWorkload MigrationLimitObjectType = "workload"
MigrationLimitObjectWorkload MigrationLimitObjectType = "workload"
MigrationLimitObjectNamespace MigrationLimitObjectType = "namespace"
)

type ObjectLimiterMap map[MigrationLimitObjectType]MigrationObjectLimiter
Expand Down
117 changes: 77 additions & 40 deletions pkg/descheduler/controllers/migration/arbitrator/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type filter struct {

args *deschedulerconfig.MigrationControllerArgs
controllerFinder controllerfinder.Interface
objectLimiters map[types.UID]*rate.Limiter
objectLimiters map[string]*rate.Limiter
limiterCache *gocache.Cache
limiterLock sync.Mutex

Expand Down Expand Up @@ -421,69 +421,106 @@ func (f *filter) trackEvictedPod(pod *corev1.Pod) {
if f.objectLimiters == nil || f.limiterCache == nil {
return
}
ownerRef := metav1.GetControllerOf(pod)
if ownerRef == nil {
return
}

objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Seconds() == 0 {
return
}

var maxMigratingReplicas int
if expectedReplicas, err := f.controllerFinder.GetExpectedScaleForPod(pod); err == nil {
maxMigrating := objectLimiterArgs.MaxMigrating
if maxMigrating == nil {
maxMigrating = f.args.MaxMigratingPerWorkload
for limiterType, objectLimiterArgs := range f.args.ObjectLimiters {
if objectLimiterArgs.Duration.Seconds() == 0 {
continue
}
maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating)
}
if maxMigratingReplicas == 0 {
return
var limiterKey string
var processScope string
switch limiterType {
case deschedulerconfig.MigrationLimitObjectWorkload:
ownerRef := metav1.GetControllerOf(pod)
if ownerRef == nil {
continue
}
limiterKey = string(ownerRef.UID)
processScope = fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)
case deschedulerconfig.MigrationLimitObjectNamespace:
limiterKey = pod.Namespace
processScope = fmt.Sprintf("%s", pod.Namespace)
}
var maxMigratingReplicas int
if expectedReplicas, err := f.controllerFinder.GetExpectedScaleForPod(pod); err == nil {
maxMigrating := objectLimiterArgs.MaxMigrating
if maxMigrating == nil {
maxMigrating = f.args.MaxMigratingPerWorkload
}
maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating)
}
if maxMigratingReplicas == 0 {
return
}
limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds())

f.trackWithLocker(limit, limiterKey, processScope, string(limiterType))
}
}

func (f *filter) trackWithLocker(limit rate.Limit, limiterKey, processScope, limiterType string) {
f.limiterLock.Lock()
defer f.limiterLock.Unlock()

uid := ownerRef.UID
limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds())
limiter := f.objectLimiters[uid]
limiter := f.objectLimiters[limiterKey]
if limiter == nil {
limiter = rate.NewLimiter(limit, 1)
f.objectLimiters[uid] = limiter
f.objectLimiters[limiterKey] = limiter
} else if limiter.Limit() != limit {
limiter.SetLimit(limit)
}

if !limiter.AllowN(f.clock.Now(), 1) {
klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for f period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)
klog.Infof("The %s %s has been frequently descheduled recently and needs to be limited for f period of time", limiterType, processScope)
}
f.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration)
f.limiterCache.Set(limiterKey, 0, gocache.DefaultExpiration)
}

func (f *filter) filterLimitedObject(pod *corev1.Pod) bool {
if f.objectLimiters == nil || f.limiterCache == nil {
return true
}
objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Duration == 0 {
return true
for limiterType, objectLimiterArgs := range f.args.ObjectLimiters {
if objectLimiterArgs.Duration.Duration == 0 {
continue
}
limiterKey, logInfo := getLimiterKeyAndLogInfo(pod, limiterType)
if limiterKey == "" {
continue
}
if !f.filterWithLocker(limiterKey) {
klog.V(4).InfoS("Pod fails the following checks", logInfo...)
return false
}
}
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
f.limiterLock.Lock()
defer f.limiterLock.Unlock()
if limiter := f.objectLimiters[ownerRef.UID]; limiter != nil {
if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 {
klog.V(4).InfoS("Pod fails the following checks", "pod", klog.KObj(pod), "checks", "limitedObject",
"owner", fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion))
return false
}
return true
}

func (f *filter) filterWithLocker(limiterKey string) bool {
f.limiterLock.Lock()
defer f.limiterLock.Unlock()
if limiter := f.objectLimiters[limiterKey]; limiter != nil {
if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 {
return false
}
}
return true
}

func getLimiterKeyAndLogInfo(pod *corev1.Pod, limiterType deschedulerconfig.MigrationLimitObjectType) (string, []interface{}) {
var limiterKey string
logInfo := []interface{}{"pod", klog.KObj(pod), "checks", fmt.Sprintf("limitedObject: %s", limiterType)}
switch limiterType {
case deschedulerconfig.MigrationLimitObjectWorkload:
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
limiterKey = string(ownerRef.UID)
logInfo = append(logInfo, "owner", fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion))
}
case deschedulerconfig.MigrationLimitObjectNamespace:
limiterKey = pod.Namespace
logInfo = append(logInfo, "namespace", fmt.Sprintf("%s", pod.Namespace))
}
return limiterKey, logInfo
}

func (f *filter) initObjectLimiters() {
var trackExpiration time.Duration
for _, v := range f.args.ObjectLimiters {
Expand All @@ -492,13 +529,13 @@ func (f *filter) initObjectLimiters() {
}
}
if trackExpiration > 0 {
f.objectLimiters = make(map[types.UID]*rate.Limiter)
f.objectLimiters = make(map[string]*rate.Limiter)
limiterExpiration := trackExpiration + trackExpiration/2
f.limiterCache = gocache.New(limiterExpiration, limiterExpiration)
f.limiterCache.OnEvicted(func(s string, _ interface{}) {
f.limiterLock.Lock()
defer f.limiterLock.Unlock()
delete(f.objectLimiters, types.UID(s))
delete(f.objectLimiters, s)
})
}
}
Expand Down
Loading

0 comments on commit 37e3272

Please sign in to comment.