Skip to content

Commit

Permalink
Add ProvisioningRequestProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Jan 31, 2024
1 parent ed6ebbe commit 0e74dcc
Show file tree
Hide file tree
Showing 16 changed files with 359 additions and 92 deletions.
4 changes: 4 additions & 0 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
}

if err := a.processors.ProvisioningRequestProcessor.Process(); err != nil {
klog.Errorf("Failed to process ProvisioningRequests, err: %v", err)
}

unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)

if err != nil {
Expand Down
25 changes: 15 additions & 10 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

Expand Down Expand Up @@ -469,15 +470,6 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
}

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
Expand All @@ -487,14 +479,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
ScaleUpOrchestrator: orchestrator.New(),
}

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)

if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(kubeClient)
if err != nil {
return nil, err
}
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provReqProcessor, err := checkcapacity.NewCheckCapacityProcessor(kubeClient)
if err != nil {
return nil, err
}
opts.Processors.ProvisioningRequestProcessor = provReqProcessor
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
26 changes: 15 additions & 11 deletions cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
)
Expand Down Expand Up @@ -73,7 +74,8 @@ type AutoscalingProcessors struct {
// * scale-downs per nodegroup
// * scale-up failures per nodegroup
// * scale-down failures per nodegroup
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
ProvisioningRequestProcessor provreq.ProvisoningRequestProcessor
}

// DefaultProcessors returns default set of processors.
Expand All @@ -95,16 +97,17 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors
nodes.NewAtomicResizeFilteringProcessor(),
},
),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(),
AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(),
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false),
ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(),
ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(),
ProvisioningRequestProcessor: provreq.NewDefaultProvisioningRequestProcessor(),
}
}

Expand All @@ -124,4 +127,5 @@ func (ap *AutoscalingProcessors) CleanUp() {
ap.CustomResourcesProcessor.CleanUp()
ap.TemplateNodeInfoProvider.CleanUp()
ap.ActionableClusterProcessor.CleanUp()
ap.ProvisioningRequestProcessor.CleanUp()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

// ProvisoningRequestProcessor process ProvisionignRequests in the cluster.
type ProvisoningRequestProcessor interface {
Process() error
CleanUp()
}

// NoOpProvisoningRequestProcessor do nothing.
type NoOpProvisoningRequestProcessor struct {
}

// NewDefaultProvisioningRequestProcessor creates an instance of PodListProcessor.
func NewDefaultProvisioningRequestProcessor() ProvisoningRequestProcessor {
return &NoOpProvisoningRequestProcessor{}
}

// Process processes lists of unschedulable and scheduled pods before scaling of the cluster.
func (p *NoOpProvisoningRequestProcessor) Process() error {
return nil
}

// CleanUp cleans up the processor's internal structures.
func (p *NoOpProvisoningRequestProcessor) CleanUp() {
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,8 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// CapacityFound indicates that all of the requested resources were
// fount in the cluster.
CapacityFound string = "CapacityFound"
// Expired indicates that the ProvisioningRequest had CapacityFound condition before
// and the reservation time is expired.
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
// Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the
Expand Down
23 changes: 12 additions & 11 deletions cluster-autoscaler/provisioningrequest/checkcapacity/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@ limitations under the License.
package checkcapacity

import (
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/klog/v2"
)

const (
defaultReservationTime = 10 * time.Minute
defaultExpirationTime = 7 * 24 * time.Hour // 7 days
)

const (
//CapacityIsNotFoundReason is added when capacity was not found in the cluster.
CapacityIsNotFoundReason = "CapacityIsNotFound"
//CapacityIsFoundReason is added when capacity was found in the cluster.
CapacityIsFoundReason = "CapacityIsFound"
//FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster.
FailedToBookCapacityReason = "FailedToBookCapacity"
//CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired.
CapacityReservationTimeExpiredReason = "CapacityReservationTimeExpired"
//CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired.
CapacityReservationTimeExpiredMsg = "Capacity reservation time is expired"
//ExpiredReason is added if ProvisioningRequest is expired.
ExpiredReason = "Expired"
//ExpiredMsg is added if ProvisioningRequest is expired.
ExpiredMsg = "ProvisioningRequest is expired"
)

func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
Expand All @@ -50,26 +51,26 @@ func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool {
for _, condition := range pr.Conditions() {
if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) {
return false
} else if checkConditionType(condition, v1beta1.CapacityFound) {
} else if checkConditionType(condition, v1beta1.Provisioned) {
book = true
}
}
return book
}

func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) {
func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string, now v1.Time) {
var newConditions []v1.Condition
newCondition := v1.Condition{
Type: conditionType,
Status: conditionStatus,
ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(),
LastTransitionTime: v1.Now(),
LastTransitionTime: now,
Reason: reason,
Message: message,
}
prevConditions := pr.Conditions()
switch conditionType {
case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed:
case v1beta1.Provisioned, v1beta1.BookingExpired, v1beta1.Failed:
conditionFound := false
for _, condition := range prevConditions {
if condition.Type == conditionType {
Expand Down
Loading

0 comments on commit 0e74dcc

Please sign in to comment.