Skip to content

Commit

Permalink
Merge pull request kubernetes-retired/contrib#1328 from mwielgus/use-…
Browse files Browse the repository at this point in the history
…cloud-provider

Cluster-autoscaler: use cloud provider interface in the code
  • Loading branch information
mwielgus authored Jul 11, 2016
2 parents e40ed20 + 303d989 commit 3684342
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 665 deletions.
4 changes: 0 additions & 4 deletions cluster-autoscaler/cloudprovider/cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ type NodeGroup interface {
// removed nodes are deleted completely)
TargetSize() (int, error)

// GetSampleNode returns a sample node that belongs to this node group, or error
// if occurred.
SampleNode() (*kube_api.Node, error)

// IncreaseSize increases the size of the node group. To delete a node you need
// to explicitly name it and use DeleteNode. This function should wait until
// node group size is updated.
Expand Down
43 changes: 7 additions & 36 deletions cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,17 @@ import (
kube_api "k8s.io/kubernetes/pkg/api"
)

// NodeProvider is a function that provides a list of nodes.
type NodeProvider func() ([]*kube_api.Node, error)

// GceCloudProvider implements CloudProvider interface.
type GceCloudProvider struct {
gceManager *GceManager
migs []*Mig
nodeProvider NodeProvider
gceManager *GceManager
migs []*Mig
}

// BuildGceCloudProvider builds CloudProvider implementation for GCE.
func BuildGceCloudProvider(gceManager *GceManager, nodeProvider NodeProvider, specs []string) (*GceCloudProvider, error) {
func BuildGceCloudProvider(gceManager *GceManager, specs []string) (*GceCloudProvider, error) {
gce := &GceCloudProvider{
gceManager: gceManager,
migs: make([]*Mig, 0),
nodeProvider: nodeProvider,
gceManager: gceManager,
migs: make([]*Mig, 0),
}
for _, spec := range specs {
if err := gce.addNodeGroup(spec); err != nil {
Expand All @@ -57,20 +52,6 @@ func (gce *GceCloudProvider) addNodeGroup(spec string) error {
if err != nil {
return err
}
nodes, err := gce.nodeProvider()
if err != nil {
return err
}
// TODO: revisit how sample nodes are chosen.
for _, node := range nodes {
if belongs, err := mig.Belongs(node); err == nil && belongs {
mig.sampleNode = node
break
}
}
if mig.sampleNode == nil {
return fmt.Errorf("no sample node found for %s", mig.Id())
}
gce.migs = append(gce.migs, mig)
gce.gceManager.RegisterMig(mig)
return nil
Expand Down Expand Up @@ -129,9 +110,8 @@ type Mig struct {

gceManager *GceManager

minSize int
maxSize int
sampleNode *kube_api.Node
minSize int
maxSize int
}

// MaxSize returns maximum size of the node group.
Expand All @@ -151,15 +131,6 @@ func (mig *Mig) TargetSize() (int, error) {
return int(size), err
}

// SampleNode returns a sample node for the mig. Assumes that mig definition doesn't change over time.
// The node may not exist anymore.
func (mig *Mig) SampleNode() (*kube_api.Node, error) {
if mig.sampleNode != nil {
return mig.sampleNode, nil
}
return nil, fmt.Errorf("no sample node available")
}

// IncreaseSize increases Mig size
func (mig *Mig) IncreaseSize(delta int) error {
if delta <= 0 {
Expand Down
78 changes: 50 additions & 28 deletions cluster-autoscaler/cluster_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"net/http"
"net/url"
"os"
"strings"
"time"

"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/cloudprovider/gce"
"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_api "k8s.io/kubernetes/pkg/api"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
Expand All @@ -34,8 +36,22 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// MultiStringFlag is a flag for passing multiple parameters using same flag
type MultiStringFlag []string

// String returns string representation of the node groups.
func (flag *MultiStringFlag) String() string {
return "[" + strings.Join(*flag, " ") + "]"
}

// Set adds a new configuration.
func (flag *MultiStringFlag) Set(value string) error {
*flag = append(*flag, value)
return nil
}

var (
migConfigFlag config.MigConfigFlag
nodeGroupsFlag MultiStringFlag
address = flag.String("address", ":8085", "The address to expose prometheus metrics.")
kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default")
cloudConfig = flag.String("cloud-config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
Expand All @@ -52,13 +68,15 @@ var (
scaleDownTrialInterval = flag.Duration("scale-down-trial-interval", 1*time.Minute,
"How often scale down possiblity is check")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")

cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce")
)

func main() {
glog.Infof("Cluster Autoscaler %s", ClusterAutoscalerVersion)

flag.Var(&migConfigFlag, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+
"Can be used multiple times. Format: <min>:<max>:<migurl>")
flag.Var(&nodeGroupsFlag, "nodes", "sets min,max size and other configuration data for a node group in a format accepted by cloud provider."+
"Can be used multiple times. Format: <min>:<max>:<other...>")
flag.Parse()

go func() {
Expand All @@ -77,27 +95,6 @@ func main() {
if err != nil {
glog.Fatalf("Failed to build Kuberentes client configuration: %v", err)
}
migConfigs := make([]*config.MigConfig, 0, len(migConfigFlag))
for i := range migConfigFlag {
migConfigs = append(migConfigs, &migConfigFlag[i])
}

// GCE Manager
var gceManager *gce.GceManager
var gceError error
if *cloudConfig != "" {
config, fileErr := os.Open(*cloudConfig)
if fileErr != nil {
glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", *cloudConfig, err)
}
defer config.Close()
gceManager, gceError = gce.CreateGceManager(migConfigs, config)
} else {
gceManager, gceError = gce.CreateGceManager(migConfigs, nil)
}
if gceError != nil {
glog.Fatalf("Failed to create GCE Manager: %v", err)
}

kubeClient := kube_client.NewOrDie(kubeConfig)

Expand All @@ -118,6 +115,31 @@ func main() {
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
recorder := eventBroadcaster.NewRecorder(kube_api.EventSource{Component: "cluster-autoscaler"})

var cloudProvider cloudprovider.CloudProvider

if *cloudProviderFlag == "gce" {
// GCE Manager
var gceManager *gce.GceManager
var gceError error
if *cloudConfig != "" {
config, fileErr := os.Open(*cloudConfig)
if fileErr != nil {
glog.Fatalf("Couldn't open cloud provider configuration %s: %#v", *cloudConfig, err)
}
defer config.Close()
gceManager, gceError = gce.CreateGceManager(config)
} else {
gceManager, gceError = gce.CreateGceManager(nil)
}
if gceError != nil {
glog.Fatalf("Failed to create GCE Manager: %v", err)
}
cloudProvider, err = gce.BuildGceCloudProvider(gceManager, nodeGroupsFlag)
if err != nil {
glog.Fatalf("Failed to create GCE cloud provider: %v", err)
}
}

for {
select {
case <-time.After(*scanInterval):
Expand All @@ -135,7 +157,7 @@ func main() {
continue
}

if err := CheckMigsAndNodes(nodes, gceManager); err != nil {
if err := CheckGroupsAndNodes(nodes, cloudProvider); err != nil {
glog.Warningf("Cluster is not ready for autoscaling: %v", err)
continue
}
Expand Down Expand Up @@ -188,7 +210,7 @@ func main() {
} else {
scaleUpStart := time.Now()
updateLastTime("scaleup")
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, migConfigs, gceManager, kubeClient, predicateChecker, recorder)
scaledUp, err := ScaleUp(unschedulablePodsToHelp, nodes, cloudProvider, kubeClient, predicateChecker, recorder)

updateDuration("scaleup", scaleUpStart)

Expand Down Expand Up @@ -245,7 +267,7 @@ func main() {
unneededNodes,
*scaleDownUnneededTime,
allScheduled,
gceManager, kubeClient, predicateChecker)
cloudProvider, kubeClient, predicateChecker)

updateDuration("scaledown", scaleDownStart)

Expand Down
115 changes: 0 additions & 115 deletions cluster-autoscaler/config/migconfig.go

This file was deleted.

40 changes: 0 additions & 40 deletions cluster-autoscaler/config/migconfig_test.go

This file was deleted.

Loading

0 comments on commit 3684342

Please sign in to comment.