Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: add new pod estimate with loadaware plugin #1992

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 71 additions & 99 deletions pkg/scheduler/plugins/loadaware/load_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,112 +149,42 @@ func (p *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *c
}
return nil
}

filterProfile := generateUsageThresholdsFilterProfile(node, p.args)
if len(filterProfile.ProdUsageThresholds) > 0 && extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd {
status := p.filterProdUsage(node, nodeMetric, filterProfile.ProdUsageThresholds)
if !status.IsSuccess() {
return status
}
} else {
var usageThresholds map[corev1.ResourceName]int64
if filterProfile.AggregatedUsage != nil {
usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
} else {
usageThresholds = filterProfile.UsageThresholds
}
if len(usageThresholds) > 0 {
status := p.filterNodeUsage(node, nodeMetric, filterProfile)
if !status.IsSuccess() {
return status
}
}
if nodeMetric.Status.NodeMetric == nil {
klog.Warningf("nodeMetrics(%s) should not be nil.", node.Name)
return nil
}

return nil
}

func (p *Plugin) filterNodeUsage(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, filterProfile *usageThresholdsFilterProfile) *framework.Status {
if nodeMetric.Status.NodeMetric == nil {
allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
zwForrest marked this conversation as resolved.
Show resolved Hide resolved
klog.ErrorS(err, "Estimated node allocatable failed!", "node", node.Name)
return nil
}
filterProfile := generateUsageThresholdsFilterProfile(node, p.args)
prodPod := len(filterProfile.ProdUsageThresholds) > 0 && extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd

var nodeUsage *slov1alpha1.ResourceMap
var usageThresholds map[corev1.ResourceName]int64
if filterProfile.AggregatedUsage != nil {
usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
if prodPod {
usageThresholds = filterProfile.ProdUsageThresholds
} else {
usageThresholds = filterProfile.UsageThresholds
}

for resourceName, threshold := range usageThresholds {
if threshold == 0 {
continue
}
allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
klog.ErrorS(err, "Failed to EstimateNode", "node", node.Name)
return nil
}
total := allocatable[resourceName]
if total.IsZero() {
continue
}
// TODO(joseph): maybe we should estimate the Pod that just be scheduled that have not reported
var nodeUsage *slov1alpha1.ResourceMap
if filterProfile.AggregatedUsage != nil {
nodeUsage = getTargetAggregatedUsage(
nodeMetric,
filterProfile.AggregatedUsage.UsageAggregatedDuration,
filterProfile.AggregatedUsage.UsageAggregationType,
)
usageThresholds = filterProfile.AggregatedUsage.UsageThresholds
} else {
nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
}
if nodeUsage == nil {
continue
}

used := nodeUsage.ResourceList[resourceName]
usage := int64(math.Round(float64(used.MilliValue()) / float64(total.MilliValue()) * 100))
if usage >= threshold {
reason := ErrReasonUsageExceedThreshold
if filterProfile.AggregatedUsage != nil {
reason = ErrReasonAggregatedUsageExceedThreshold
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(reason, resourceName))
usageThresholds = filterProfile.UsageThresholds
}
}
return nil
}

func (p *Plugin) filterProdUsage(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, prodUsageThresholds map[corev1.ResourceName]int64) *framework.Status {
if len(nodeMetric.Status.PodsMetric) == 0 {
estimatedUsed, err := p.GetEstimatedUsed(node.Name, nodeMetric, pod, nodeUsage, prodPod)
if err != nil {
klog.ErrorS(err, "GetEstimatedUsed failed!", "node", node.Name)
return nil
}

// TODO(joseph): maybe we should estimate the Pod that just be scheduled that have not reported
podMetrics := buildPodMetricMap(p.podLister, nodeMetric, true)
prodPodUsages, _ := sumPodUsages(podMetrics, nil)
for resourceName, threshold := range prodUsageThresholds {
if threshold == 0 {
continue
}
allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
klog.ErrorS(err, "Failed to EstimateNode", "node", node.Name)
return nil
}
total := allocatable[resourceName]
if total.IsZero() {
continue
}
used := prodPodUsages[resourceName]
usage := int64(math.Round(float64(used.MilliValue()) / float64(total.MilliValue()) * 100))
if usage >= threshold {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, resourceName))
}
}
return nil
return filterNodeUsage(usageThresholds, estimatedUsed, allocatable, prodPod, filterProfile)
}

func (p *Plugin) ScoreExtensions() framework.ScoreExtensions {
Expand Down Expand Up @@ -291,13 +221,44 @@ func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *co
if p.args.NodeMetricExpirationSeconds != nil && isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds) {
return 0, nil
}
if nodeMetric.Status.NodeMetric == nil {
klog.Warningf("nodeMetrics(%s) should not be nil.", node.Name)
return 0, nil
}

prodPod := extension.GetPodPriorityClassWithDefault(pod) == extension.PriorityProd && p.args.ScoreAccordingProdUsage
var nodeUsage *slov1alpha1.ResourceMap
if !prodPod {
if scoreWithAggregation(p.args.Aggregated) {
nodeUsage = getTargetAggregatedUsage(nodeMetric, &p.args.Aggregated.ScoreAggregatedDuration, p.args.Aggregated.ScoreAggregationType)
} else {
nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
}
}
estimatedUsed, err := p.GetEstimatedUsed(nodeName, nodeMetric, pod, nodeUsage, prodPod)
if err != nil {
zwForrest marked this conversation as resolved.
Show resolved Hide resolved
klog.ErrorS(err, "GetEstimatedUsed failed!", "node", node.Name)
return 0, nil
}

allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
klog.ErrorS(err, "Estimated node allocatable failed!", "node", node.Name)
return 0, nil
}
score := loadAwareSchedulingScorer(p.args.ResourceWeights, estimatedUsed, allocatable)
return score, nil
}

func (p *Plugin) GetEstimatedUsed(nodeName string, nodeMetric *slov1alpha1.NodeMetric, pod *corev1.Pod, nodeUsage *slov1alpha1.ResourceMap, prodPod bool) (map[corev1.ResourceName]int64, error) {
if nodeMetric == nil {
return nil, nil
}
podMetrics := buildPodMetricMap(p.podLister, nodeMetric, prodPod)

estimatedUsed, err := p.estimator.EstimatePod(pod)
if err != nil {
return 0, nil
return nil, err
}
assignedPodEstimatedUsed, estimatedPods := p.estimatedAssignedPodUsed(nodeName, nodeMetric, podMetrics, prodPod)
for resourceName, value := range assignedPodEstimatedUsed {
Expand All @@ -310,12 +271,6 @@ func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *co
}
} else {
if nodeMetric.Status.NodeMetric != nil {
var nodeUsage *slov1alpha1.ResourceMap
if scoreWithAggregation(p.args.Aggregated) {
nodeUsage = getTargetAggregatedUsage(nodeMetric, &p.args.Aggregated.ScoreAggregatedDuration, p.args.Aggregated.ScoreAggregationType)
} else {
nodeUsage = &nodeMetric.Status.NodeMetric.NodeUsage
}
if nodeUsage != nil {
for resourceName, quantity := range nodeUsage.ResourceList {
if q := estimatedPodActualUsages[resourceName]; !q.IsZero() {
Expand All @@ -329,13 +284,30 @@ func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *co
}
}
}
return estimatedUsed, nil
}

allocatable, err := p.estimator.EstimateNode(node)
if err != nil {
return 0, nil
func filterNodeUsage(usageThresholds, estimatedUsed map[corev1.ResourceName]int64, allocatable corev1.ResourceList, prodPod bool, filterProfile *usageThresholdsFilterProfile) *framework.Status {
for resourceName, value := range usageThresholds {
if value == 0 {
continue
}
total := getResourceValue(resourceName, allocatable[resourceName])
if total == 0 {
continue
}
usage := int64(math.Round(float64(estimatedUsed[resourceName]) / float64(total) * 100))
if usage <= value {
continue
}

reason := ErrReasonUsageExceedThreshold
if !prodPod && filterProfile.AggregatedUsage != nil {
reason = ErrReasonAggregatedUsageExceedThreshold
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(reason, resourceName))
}
score := loadAwareSchedulingScorer(p.args.ResourceWeights, estimatedUsed, allocatable)
return score, nil
return nil
}

func (p *Plugin) estimatedAssignedPodUsed(nodeName string, nodeMetric *slov1alpha1.NodeMetric, podMetrics map[string]corev1.ResourceList, filterProdPod bool) (map[corev1.ResourceName]int64, sets.String) {
Expand Down
Loading