Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koord-descheduler: add migration object limiter for namespace #2068

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/descheduler/apis/config/types_pluginargs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ type MigrationControllerArgs struct {
type MigrationLimitObjectType string

const (
MigrationLimitObjectWorkload MigrationLimitObjectType = "workload"
MigrationLimitObjectWorkload MigrationLimitObjectType = "workload"
MigrationLimitObjectNamespace MigrationLimitObjectType = "namespace"
)

type ObjectLimiterMap map[MigrationLimitObjectType]MigrationObjectLimiter
Expand Down
1 change: 1 addition & 0 deletions pkg/descheduler/apis/config/v1alpha2/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
MigrationLimitObjectWorkload: {
Duration: metav1.Duration{Duration: 5 * time.Minute},
},
// namespace object limiter is disabled as default
}

defaultLoadAnomalyCondition = &LoadAnomalyCondition{
Expand Down
4 changes: 2 additions & 2 deletions pkg/descheduler/controllers/migration/arbitrator/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,13 @@ func (f *filter) initFilters(args *deschedulerconfig.MigrationControllerArgs, ha
if err != nil {
return err
}
retriablePodFilters := podutil.WrapFilterFuncs(
retryablePodFilters := podutil.WrapFilterFuncs(
f.filterMaxMigratingPerNode,
f.filterMaxMigratingPerNamespace,
f.filterMaxMigratingOrUnavailablePerWorkload,
)
f.retryablePodFilter = func(pod *corev1.Pod) bool {
return evictionsutil.HaveEvictAnnotation(pod) || retriablePodFilters(pod)
return evictionsutil.HaveEvictAnnotation(pod) || retryablePodFilters(pod)
}
f.nonRetryablePodFilter = func(pod *corev1.Pod) bool {
// any annotated as evictable pod pass non-retryable filter
Expand Down
167 changes: 111 additions & 56 deletions pkg/descheduler/controllers/migration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ type Reconciler struct {
assumedCache *assumedCache
clock clock.Clock

arbitrator arbitrator.Arbitrator
objectLimiters map[types.UID]*rate.Limiter
limiterCache *gocache.Cache
limiterLock sync.Mutex
arbitrator arbitrator.Arbitrator

limiterMap map[deschedulerconfig.MigrationLimitObjectType]map[string]*rate.Limiter
limiterCacheMap map[deschedulerconfig.MigrationLimitObjectType]*gocache.Cache
limiterLock sync.Mutex
}

func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) {
Expand Down Expand Up @@ -452,27 +453,67 @@ func (r *Reconciler) preparePodRef(ctx context.Context, job *sev1alpha1.PodMigra
}

func (r *Reconciler) checkPodExceedObjectLimiter(pod *corev1.Pod) bool {
if r.objectLimiters == nil || r.limiterCache == nil {
if r.limiterMap == nil || len(r.limiterMap) == 0 || r.limiterCacheMap == nil || len(r.limiterCacheMap) == 0 {
return false
}
objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Duration == 0 {
for limiterType, objectLimiterArgs := range r.args.ObjectLimiters {
if objectLimiterArgs.Duration.Duration == 0 {
continue
}
limiterKey, processScope := getLimiterKeyAndProcessScope(pod, limiterType)
if limiterKey == "" {
continue
}
logInfo := getLogInfo(pod, limiterType, processScope)
if r.exceeded(limiterKey, limiterType) {
klog.V(4).InfoS("Pod fails the following checks", logInfo...)
return true
}
}
return false
}

func (r *Reconciler) exceeded(limiterKey string, limiterType deschedulerconfig.MigrationLimitObjectType) bool {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
limiters, ok := r.limiterMap[limiterType]
if !ok {
return false
}
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
if limiter := r.objectLimiters[ownerRef.UID]; limiter != nil {
if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 {
klog.V(4).InfoS("Pod fails the following checks", "pod", klog.KObj(pod), "checks", "limitedObject",
"owner", fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion))
return true
}
limiter := limiters[limiterKey]
if limiter != nil {
if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 {
return true
}
}
return false
}

func getLimiterKeyAndProcessScope(pod *corev1.Pod, limiterType deschedulerconfig.MigrationLimitObjectType) (limiterKey, processScope string) {
switch limiterType {
case deschedulerconfig.MigrationLimitObjectWorkload:
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
limiterKey = string(ownerRef.UID)
processScope = fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)
}
case deschedulerconfig.MigrationLimitObjectNamespace:
limiterKey = pod.Namespace
processScope = fmt.Sprintf("%s", pod.Namespace)
}
return limiterKey, processScope
}

func getLogInfo(pod *corev1.Pod, limiterType deschedulerconfig.MigrationLimitObjectType, processScope string) []interface{} {
logInfo := []interface{}{"pod", klog.KObj(pod), "checks", fmt.Sprintf("limitedObject: %s", limiterType)}
switch limiterType {
case deschedulerconfig.MigrationLimitObjectWorkload:
logInfo = append(logInfo, "owner", processScope)
case deschedulerconfig.MigrationLimitObjectNamespace:
logInfo = append(logInfo, "namespace", processScope)
}
return logInfo
}

func (r *Reconciler) requeueJobIfObjectLimiterFailed(ctx context.Context, job *sev1alpha1.PodMigrationJob) bool {
if evictionsutil.HaveEvictAnnotation(job) {
return false
Expand Down Expand Up @@ -819,48 +860,59 @@ func (r *Reconciler) prepareJobWithReservationScheduleSuccess(ctx context.Contex
}

func (r *Reconciler) trackEvictedPod(pod *corev1.Pod) {
if r.objectLimiters == nil || r.limiterCache == nil {
return
}
ownerRef := metav1.GetControllerOf(pod)
if ownerRef == nil {
return
}

objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Seconds() == 0 {
if r.limiterMap == nil || len(r.limiterMap) == 0 || r.limiterCacheMap == nil || len(r.limiterCacheMap) == 0 {
return
}

var maxMigratingReplicas int
if expectedReplicas, err := r.controllerFinder.GetExpectedScaleForPod(pod); err == nil {
maxMigrating := objectLimiterArgs.MaxMigrating
if maxMigrating == nil {
maxMigrating = r.args.MaxMigratingPerWorkload
for limiterType, objectLimiterArgs := range r.args.ObjectLimiters {
if objectLimiterArgs.Duration.Seconds() == 0 {
continue
}
maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating)
}
if maxMigratingReplicas == 0 {
return
limiterKey, processScope := getLimiterKeyAndProcessScope(pod, limiterType)
if limiterKey == "" {
continue
}
var maxMigratingReplicas int
if expectedReplicas, err := r.controllerFinder.GetExpectedScaleForPod(pod); err == nil {
maxMigrating := objectLimiterArgs.MaxMigrating
if maxMigrating == nil {
maxMigrating = r.args.MaxMigratingPerWorkload
}
maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating)
}
if maxMigratingReplicas == 0 {
return
}
limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds())

r.track(limit, limiterKey, processScope, limiterType, maxMigratingReplicas)
}
}

func (r *Reconciler) track(limit rate.Limit, limiterKey, processScope string, limiterType deschedulerconfig.MigrationLimitObjectType, maxMigratingReplicas int) {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()

uid := ownerRef.UID
limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds())
limiter := r.objectLimiters[uid]
limiters, ok := r.limiterMap[limiterType]
if !ok {
klog.Errorf("failed to find limiters for type %s", limiterType)
return
}
limiter := limiters[limiterKey]
if limiter == nil {
limiter = rate.NewLimiter(limit, maxMigratingReplicas)
r.objectLimiters[uid] = limiter
limiters[limiterKey] = limiter
} else if limiter.Limit() != limit {
limiter.SetLimit(limit)
}

if !limiter.AllowN(r.clock.Now(), 1) {
klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for r period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)
klog.Infof("The %s %s has been frequently descheduled recently and needs to be limited for f period of time", limiterType, processScope)
}
limiterCache, ok := r.limiterCacheMap[limiterType]
if !ok {
klog.Errorf("failed to find limiterCache for type %s", limiterType)
}
r.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration)
limiterCache.Set(limiterKey, 0, gocache.DefaultExpiration)
}

func (r *Reconciler) deleteReservation(ctx context.Context, job *sev1alpha1.PodMigrationJob) error {
Expand Down Expand Up @@ -1040,20 +1092,23 @@ func (r *Reconciler) PreEvictionFilter(pod *corev1.Pod) bool {
}

func (r *Reconciler) initObjectLimiters() {
var trackExpiration time.Duration
for _, v := range r.args.ObjectLimiters {
if v.Duration.Duration > trackExpiration {
trackExpiration = v.Duration.Duration
r.limiterMap = make(map[deschedulerconfig.MigrationLimitObjectType]map[string]*rate.Limiter)
r.limiterCacheMap = make(map[deschedulerconfig.MigrationLimitObjectType]*gocache.Cache)

for limiterType, limiterConfig := range r.args.ObjectLimiters {
var trackExpiration time.Duration
if limiterConfig.Duration.Duration > trackExpiration {
trackExpiration = limiterConfig.Duration.Duration
}
if trackExpiration > 0 {
r.limiterMap[limiterType] = make(map[string]*rate.Limiter)
limiterExpiration := trackExpiration + trackExpiration/2
r.limiterCacheMap[limiterType] = gocache.New(limiterExpiration, limiterExpiration)
r.limiterCacheMap[limiterType].OnEvicted(func(s string, _ interface{}) {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
delete(r.limiterMap[limiterType], s)
})
}
}
if trackExpiration > 0 {
r.objectLimiters = make(map[types.UID]*rate.Limiter)
limiterExpiration := trackExpiration + trackExpiration/2
r.limiterCache = gocache.New(limiterExpiration, limiterExpiration)
r.limiterCache.OnEvicted(func(s string, _ interface{}) {
r.limiterLock.Lock()
defer r.limiterLock.Unlock()
delete(r.objectLimiters, types.UID(s))
})
}
}
Loading