Skip to content

Commit

Permalink
scheduler: add new pod estimate with loadaware plugin (#1992)
Browse files Browse the repository at this point in the history
Signed-off-by: zwForrest <756495135@qq.com>
Co-authored-by: zengwang1 <zengwang1@xiaomi.com>
  • Loading branch information
zwForrest and zengwang1 authored Jun 12, 2024
1 parent ba299ed commit 0d57e7e
Show file tree
Hide file tree
Showing 2 changed files with 473 additions and 99 deletions.
170 changes: 71 additions & 99 deletions pkg/scheduler/plugins/loadaware/load_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,112 +149,42 @@ func (p *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *c
}
return nil
}

filterProfile := generateUsageThresholdsFilterProfile(node, p.args)
if len(filterProfile.ProdUsageThresholds) > 0 && extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd {
status := p.filterProdUsage(node, nodeMetric, filterProfile.ProdUsageThresholds)
if !status.IsSuccess() {
return status
}
} else {
var usageThresholds map[corev1.ResourceName]int64
if filterProfile.AggregatedUsage != nil {
usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
} else {
usageThresholds = filterProfile.UsageThresholds
}
if len(usageThresholds) > 0 {
status := p.filterNodeUsage(node, nodeMetric, filterProfile)
if !status.IsSuccess() {
return status
}
}
if nodeMetric.Status.NodeMetric == nil {
klog.Warningf("nodeMetrics(%s) should not be nil.", node.Name)
return nil
}

return nil
}

func (p *Plugin) filterNodeUsage(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, filterProfile *usageThresholdsFilterProfile) *framework.Status {
if nodeMetric.Status.NodeMetric == nil {
allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
klog.ErrorS(err, "Estimated node allocatable failed!", "node", node.Name)
return nil
}
filterProfile := generateUsageThresholdsFilterProfile(node, p.args)
prodPod := len(filterProfile.ProdUsageThresholds) > 0 && extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd

var nodeUsage *slov1alpha1.ResourceMap
var usageThresholds map[corev1.ResourceName]int64
if filterProfile.AggregatedUsage != nil {
usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
if prodPod {
usageThresholds = filterProfile.ProdUsageThresholds
} else {
usageThresholds = filterProfile.UsageThresholds
}

for resourceName, threshold := range usageThresholds {
if threshold == 0 {
continue
}
allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
klog.ErrorS(err, "Failed to EstimateNode", "node", node.Name)
return nil
}
total := allocatable[resourceName]
if total.IsZero() {
continue
}
// TODO(joseph): maybe we should estimate the Pod that just be scheduled that have not reported
var nodeUsage *slov1alpha1.ResourceMap
if filterProfile.AggregatedUsage != nil {
nodeUsage = getTargetAggregatedUsage(
nodeMetric,
filterProfile.AggregatedUsage.UsageAggregatedDuration,
filterProfile.AggregatedUsage.UsageAggregationType,
)
usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
} else {
nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
}
if nodeUsage == nil {
continue
}

used := nodeUsage.ResourceList[resourceName]
usage := int64(math.Round(float64(used.MilliValue()) / float64(total.MilliValue()) * 100))
if usage >= threshold {
reason := ErrReasonUsageExceedThreshold
if filterProfile.AggregatedUsage != nil {
reason = ErrReasonAggregatedUsageExceedThreshold
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(reason, resourceName))
usageThresholds = filterProfile.UsageThresholds
}
}
return nil
}

func (p *Plugin) filterProdUsage(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, prodUsageThresholds map[corev1.ResourceName]int64) *framework.Status {
if len(nodeMetric.Status.PodsMetric) == 0 {
estimatedUsed, err := p.GetEstimatedUsed(node.Name, nodeMetric, pod, nodeUsage, prodPod)
if err != nil {
klog.ErrorS(err, "GetEstimatedUsed failed!", "node", node.Name)
return nil
}

// TODO(joseph): maybe we should estimate the Pod that just be scheduled that have not reported
podMetrics := buildPodMetricMap(p.podLister, nodeMetric, true)
prodPodUsages, _ := sumPodUsages(podMetrics, nil)
for resourceName, threshold := range prodUsageThresholds {
if threshold == 0 {
continue
}
allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
klog.ErrorS(err, "Failed to EstimateNode", "node", node.Name)
return nil
}
total := allocatable[resourceName]
if total.IsZero() {
continue
}
used := prodPodUsages[resourceName]
usage := int64(math.Round(float64(used.MilliValue()) / float64(total.MilliValue()) * 100))
if usage >= threshold {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, resourceName))
}
}
return nil
return filterNodeUsage(usageThresholds, estimatedUsed, allocatable, prodPod, filterProfile)
}

func (p *Plugin) ScoreExtensions() framework.ScoreExtensions {
Expand Down Expand Up @@ -291,13 +221,44 @@ func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *co
if p.args.NodeMetricExpirationSeconds != nil && isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds) {
return 0, nil
}
if nodeMetric.Status.NodeMetric == nil {
klog.Warningf("nodeMetrics(%s) should not be nil.", node.Name)
return 0, nil
}

prodPod := extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd && p.args.ScoreAccordingProdUsage
var nodeUsage *slov1alpha1.ResourceMap
if !prodPod {
if scoreWithAggregation(p.args.Aggregated) {
nodeUsage = getTargetAggregatedUsage(nodeMetric, &p.args.Aggregated.ScoreAggregatedDuration, p.args.Aggregated.ScoreAggregationType)
} else {
nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
}
}
estimatedUsed, err := p.GetEstimatedUsed(nodeName, nodeMetric, pod, nodeUsage, prodPod)
if err != nil {
klog.ErrorS(err, "GetEstimatedUsed failed!", "node", node.Name)
return 0, nil
}

allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
klog.ErrorS(err, "Estimated node allocatable failed!", "node", node.Name)
return 0, nil
}
score := loadAwareSchedulingScorer(p.args.ResourceWeights, estimatedUsed, allocatable)
return score, nil
}

func (p *Plugin) GetEstimatedUsed(nodeName string, nodeMetric *slov1alpha1.NodeMetric, pod *corev1.Pod, nodeUsage *slov1alpha1.ResourceMap, prodPod bool) (map[corev1.ResourceName]int64, error) {
if nodeMetric == nil {
return nil, nil
}
podMetrics := buildPodMetricMap(p.podLister, nodeMetric, prodPod)

estimatedUsed, err := p.estimator.EstimatePod(pod)
if err != nil {
return 0, nil
return nil, err
}
assignedPodEstimatedUsed, estimatedPods := p.estimatedAssignedPodUsed(nodeName, nodeMetric, podMetrics, prodPod)
for resourceName, value := range assignedPodEstimatedUsed {
Expand All @@ -310,12 +271,6 @@ func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *co
}
} else {
if nodeMetric.Status.NodeMetric != nil {
var nodeUsage *slov1alpha1.ResourceMap
if scoreWithAggregation(p.args.Aggregated) {
nodeUsage = getTargetAggregatedUsage(nodeMetric, &p.args.Aggregated.ScoreAggregatedDuration, p.args.Aggregated.ScoreAggregationType)
} else {
nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
}
if nodeUsage != nil {
for resourceName, quantity := range nodeUsage.ResourceList {
if q := estimatedPodActualUsages[resourceName]; !q.IsZero() {
Expand All @@ -329,13 +284,30 @@ func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *co
}
}
}
return estimatedUsed, nil
}

allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
return 0, nil
func filterNodeUsage(usageThresholds, estimatedUsed map[corev1.ResourceName]int64, allocatable corev1.ResourceList, prodPod bool, filterProfile *usageThresholdsFilterProfile) *framework.Status {
for resourceName, value := range usageThresholds {
if value == 0 {
continue
}
total := getResourceValue(resourceName, allocatable[resourceName])
if total == 0 {
continue
}
usage := int64(math.Round(float64(estimatedUsed[resourceName]) / float64(total) * 100))
if usage <= value {
continue
}

reason := ErrReasonUsageExceedThreshold
if !prodPod && filterProfile.AggregatedUsage != nil {
reason = ErrReasonAggregatedUsageExceedThreshold
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(reason, resourceName))
}
score := loadAwareSchedulingScorer(p.args.ResourceWeights, estimatedUsed, allocatable)
return score, nil
return nil
}

func (p *Plugin) estimatedAssignedPodUsed(nodeName string, nodeMetric *slov1alpha1.NodeMetric, podMetrics map[string]corev1.ResourceList, filterProdPod bool) (map[corev1.ResourceName]int64, sets.String) {
Expand Down
Loading

0 comments on commit 0d57e7e

Please sign in to comment.