Skip to content

Commit

Permalink
chore: backport latest VMs pool and HasInstance() implementations, fo…
Browse files Browse the repository at this point in the history
…r 1.30 (#7201)

* chore: backport latest VMs pool and HasInstance() implementations

* doc: fix header for vms pool files

* chore: vendor
  • Loading branch information
comtalyst authored Aug 26, 2024
1 parent 7a5418f commit 7a46404
Show file tree
Hide file tree
Showing 54 changed files with 6,792 additions and 481 deletions.
106 changes: 83 additions & 23 deletions cluster-autoscaler/cloudprovider/azure/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type azureCache struct {
// vmType can be one of vmTypeVMSS (default), vmTypeStandard
vmType string

vmsPoolSet map[string]struct{} // track the nodepools that're vms pool

// scaleSets keeps the set of all known scalesets in the resource group, populated/refreshed via VMSS.List() call.
// It is only used/populated if vmType is vmTypeVMSS (default).
scaleSets map[string]compute.VirtualMachineScaleSet
Expand Down Expand Up @@ -103,6 +105,7 @@ func newAzureCache(client *azClient, cacheTTL time.Duration, config Config) (*az
refreshInterval: cacheTTL,
resourceGroup: config.ResourceGroup,
vmType: config.VMType,
vmsPoolSet: make(map[string]struct{}),
scaleSets: make(map[string]compute.VirtualMachineScaleSet),
virtualMachines: make(map[string][]compute.VirtualMachine),
registeredNodeGroups: make([]cloudprovider.NodeGroup, 0),
Expand All @@ -123,6 +126,13 @@ func newAzureCache(client *azClient, cacheTTL time.Duration, config Config) (*az
return cache, nil
}

func (m *azureCache) getVMsPoolSet() map[string]struct{} {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.vmsPoolSet
}

func (m *azureCache) getVirtualMachines() map[string][]compute.VirtualMachine {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand Down Expand Up @@ -197,58 +207,87 @@ func (m *azureCache) regenerate() error {
return nil
}

// fetchAzureResources retrieves and updates the cached Azure resources.
//
// This function performs the following:
// - Fetches and updates the list of Virtual Machine Scale Sets (VMSS) in the specified resource group.
// - Fetches and updates the list of Virtual Machines (VMs) and identifies the node pools they belong to.
// - Maintains a set of VMs pools and VMSS resources which helps the Cluster Autoscaler (CAS) operate on mixed node pools.
//
// Returns an error if any of the Azure API calls fail.
func (m *azureCache) fetchAzureResources() error {
m.mutex.Lock()
defer m.mutex.Unlock()

switch m.vmType {
case vmTypeVMSS:
// List all VMSS in the RG.
vmssResult, err := m.fetchScaleSets()
if err == nil {
m.scaleSets = vmssResult
} else {
return err
}
case vmTypeStandard:
// List all VMs in the RG.
vmResult, err := m.fetchVirtualMachines()
if err == nil {
m.virtualMachines = vmResult
} else {
return err
}
// NOTE: this lists virtual machine scale sets, not virtual machine
// scale set instances
vmssResult, err := m.fetchScaleSets()
if err != nil {
return err
}
m.scaleSets = vmssResult
vmResult, vmsPoolSet, err := m.fetchVirtualMachines()
if err != nil {
return err
}
// we fetch both sets of resources since CAS may operate on mixed nodepools
m.virtualMachines = vmResult
m.vmsPoolSet = vmsPoolSet

return nil
}

const (
legacyAgentpoolNameTag = "poolName"
agentpoolNameTag = "aks-managed-poolName"
agentpoolTypeTag = "aks-managed-agentpool-type"
vmsPoolType = "VirtualMachines"
)

// fetchVirtualMachines returns the updated list of virtual machines in the config resource group using the Azure API.
func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine, error) {
func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine, map[string]struct{}, error) {
ctx, cancel := getContextWithCancel()
defer cancel()

result, err := m.azClient.virtualMachinesClient.List(ctx, m.resourceGroup)
if err != nil {
klog.Errorf("VirtualMachinesClient.List in resource group %q failed: %v", m.resourceGroup, err)
return nil, err.Error()
return nil, nil, err.Error()
}

instances := make(map[string][]compute.VirtualMachine)
// track the nodepools that're vms pools
vmsPoolSet := make(map[string]struct{})
for _, instance := range result {
if instance.Tags == nil {
continue
}

tags := instance.Tags
vmPoolName := tags["poolName"]
vmPoolName := tags[agentpoolNameTag]
// fall back to legacy tag name if not found
if vmPoolName == nil {
vmPoolName = tags[legacyAgentpoolNameTag]
}
if vmPoolName == nil {
continue
}

instances[to.String(vmPoolName)] = append(instances[to.String(vmPoolName)], instance)

// if the nodepool is already in the map, skip it
if _, ok := vmsPoolSet[to.String(vmPoolName)]; ok {
continue
}

// nodes from vms pool will have tag "aks-managed-agentpool-type" set to "VirtualMachines"
if agentpoolType := tags[agentpoolTypeTag]; agentpoolType != nil {
if strings.EqualFold(to.String(agentpoolType), vmsPoolType) {
vmsPoolSet[to.String(vmPoolName)] = struct{}{}
}
}
}
return instances, nil
return instances, vmsPoolSet, nil
}

// fetchScaleSets returns the updated list of scale sets in the config resource group using the Azure API.
Expand Down Expand Up @@ -280,7 +319,6 @@ func (m *azureCache) Register(nodeGroup cloudprovider.NodeGroup) bool {
// Node group is already registered and min/max size haven't changed, no action required.
return false
}

m.registeredNodeGroups[i] = nodeGroup
klog.V(4).Infof("Node group %q updated", nodeGroup.Id())
m.invalidateUnownedInstanceCache()
Expand All @@ -289,6 +327,7 @@ func (m *azureCache) Register(nodeGroup cloudprovider.NodeGroup) bool {
}

klog.V(4).Infof("Registering Node Group %q", nodeGroup.Id())

m.registeredNodeGroups = append(m.registeredNodeGroups, nodeGroup)
m.invalidateUnownedInstanceCache()
return true
Expand Down Expand Up @@ -357,8 +396,28 @@ func (m *azureCache) getAutoscalingOptions(ref azureRef) map[string]string {
return m.autoscalingOptions[ref]
}

// HasInstance returns if a given instance exists in the azure cache
func (m *azureCache) HasInstance(providerID string) (bool, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
resourceID, err := convertResourceGroupNameToLower(providerID)
if err != nil {
// Most likely an invalid resource id, we should return an error
// most of these shouldn't make it here do to higher level
// validation in the HasInstance azure.cloudprovider function
return false, err
}

if m.getInstanceFromCache(resourceID) != nil {
return true, nil
}
// couldn't find instance in the cache, assume it's deleted
return false, cloudprovider.ErrNotImplemented
}

// FindForInstance returns node group of the given Instance
func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) {
vmsPoolSet := m.getVMsPoolSet()
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -376,7 +435,8 @@ func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudpr
return nil, nil
}

if vmType == vmTypeVMSS {
// cluster with vmss pool only
if vmType == vmTypeVMSS && len(vmsPoolSet) == 0 {
if m.areAllScaleSetsUniform() {
// Omit virtual machines not managed by vmss only in case of uniform scale set.
if ok := virtualMachineRE.Match([]byte(inst.Name)); ok {
Expand Down
131 changes: 131 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ import (
"os"
"time"

_ "go.uber.org/mock/mockgen/model" // for go:generate

azextensions "github.com/Azure/azure-sdk-for-go-extensions/pkg/middleware"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
azurecore_policy "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-02-01/storage"
Expand Down Expand Up @@ -138,6 +148,118 @@ func (az *azDeploymentsClient) Delete(ctx context.Context, resourceGroupName, de
return future.Response(), err
}

//go:generate sh -c "mockgen k8s.io/autoscaler/cluster-autoscaler/cloudprovider/azure AgentPoolsClient >./agentpool_client.go"

// AgentPoolsClient interface defines the methods needed for scaling vms pool.
// it is implemented by track2 sdk armcontainerservice.AgentPoolsClient
type AgentPoolsClient interface {
Get(ctx context.Context,
resourceGroupName, resourceName, agentPoolName string,
options *armcontainerservice.AgentPoolsClientGetOptions) (
armcontainerservice.AgentPoolsClientGetResponse, error)
BeginCreateOrUpdate(
ctx context.Context,
resourceGroupName, resourceName, agentPoolName string,
parameters armcontainerservice.AgentPool,
options *armcontainerservice.AgentPoolsClientBeginCreateOrUpdateOptions) (
*runtime.Poller[armcontainerservice.AgentPoolsClientCreateOrUpdateResponse], error)
BeginDeleteMachines(
ctx context.Context,
resourceGroupName, resourceName, agentPoolName string,
machines armcontainerservice.AgentPoolDeleteMachinesParameter,
options *armcontainerservice.AgentPoolsClientBeginDeleteMachinesOptions) (
*runtime.Poller[armcontainerservice.AgentPoolsClientDeleteMachinesResponse], error)
}

func getAgentpoolClientCredentials(cfg *Config) (azcore.TokenCredential, error) {
var cred azcore.TokenCredential
var err error
if cfg.AuthMethod == authMethodCLI {
cred, err = azidentity.NewAzureCLICredential(&azidentity.AzureCLICredentialOptions{
TenantID: cfg.TenantID})
if err != nil {
klog.Errorf("NewAzureCLICredential failed: %v", err)
return nil, err
}
} else if cfg.AuthMethod == "" || cfg.AuthMethod == authMethodPrincipal {
cred, err = azidentity.NewClientSecretCredential(cfg.TenantID, cfg.AADClientID, cfg.AADClientSecret, nil)
if err != nil {
klog.Errorf("NewClientSecretCredential failed: %v", err)
return nil, err
}
} else {
return nil, fmt.Errorf("unsupported authorization method: %s", cfg.AuthMethod)
}
return cred, nil
}

func getAgentpoolClientRetryOptions(cfg *Config) azurecore_policy.RetryOptions {
if cfg.AuthMethod == authMethodCLI {
return azurecore_policy.RetryOptions{
MaxRetries: -1, // no retry when using CLI auth for UT
}
}
return azextensions.DefaultRetryOpts()
}

func newAgentpoolClient(cfg *Config) (AgentPoolsClient, error) {
retryOptions := getAgentpoolClientRetryOptions(cfg)

if cfg.ARMBaseURLForAPClient != "" {
klog.V(10).Infof("Using ARMBaseURLForAPClient to create agent pool client")
return newAgentpoolClientWithConfig(cfg.SubscriptionID, nil, cfg.ARMBaseURLForAPClient, "UNKNOWN", retryOptions)
}

return newAgentpoolClientWithPublicEndpoint(cfg, retryOptions)
}

func newAgentpoolClientWithConfig(subscriptionID string, cred azcore.TokenCredential,
cloudCfgEndpoint, cloudCfgAudience string, retryOptions azurecore_policy.RetryOptions) (AgentPoolsClient, error) {
agentPoolsClient, err := armcontainerservice.NewAgentPoolsClient(subscriptionID, cred,
&policy.ClientOptions{
ClientOptions: azurecore_policy.ClientOptions{
Cloud: cloud.Configuration{
Services: map[cloud.ServiceName]cloud.ServiceConfiguration{
cloud.ResourceManager: {
Endpoint: cloudCfgEndpoint,
Audience: cloudCfgAudience,
},
},
},
Telemetry: azextensions.DefaultTelemetryOpts(getUserAgentExtension()),
Transport: azextensions.DefaultHTTPClient(),
Retry: retryOptions,
},
})

if err != nil {
return nil, fmt.Errorf("failed to init cluster agent pools client: %w", err)
}

klog.V(10).Infof("Successfully created agent pool client with ARMBaseURL")
return agentPoolsClient, nil
}

func newAgentpoolClientWithPublicEndpoint(cfg *Config, retryOptions azurecore_policy.RetryOptions) (AgentPoolsClient, error) {
cred, err := getAgentpoolClientCredentials(cfg)
if err != nil {
klog.Errorf("failed to get agent pool client credentials: %v", err)
return nil, err
}

// default to public cloud
env := azure.PublicCloud
if cfg.Cloud != "" {
env, err = azure.EnvironmentFromName(cfg.Cloud)
if err != nil {
klog.Errorf("failed to get environment from name %s: with error: %v", cfg.Cloud, err)
return nil, err
}
}

return newAgentpoolClientWithConfig(cfg.SubscriptionID, cred, env.ResourceManagerEndpoint, env.TokenAudience, retryOptions)
}

type azAccountsClient struct {
client storage.AccountsClient
}
Expand All @@ -151,6 +273,7 @@ type azClient struct {
disksClient diskclient.Interface
storageAccountsClient storageaccountclient.Interface
skuClient compute.ResourceSkusClient
agentPoolClient AgentPoolsClient
}

// newServicePrincipalTokenFromCredentials creates a new ServicePrincipalToken using values of the
Expand Down Expand Up @@ -279,6 +402,13 @@ func newAzClient(cfg *Config, env *azure.Environment) (*azClient, error) {
skuClient.UserAgent = azClientConfig.UserAgent
klog.V(5).Infof("Created sku client with authorizer: %v", skuClient)

agentPoolClient, err := newAgentpoolClient(cfg)
if err != nil {
// we don't want to fail the whole process so we don't break any existing functionality
// since this may not be fatal - it is only used by vms pool which is still under development.
klog.Warningf("newAgentpoolClient failed with error: %s", err)
}

return &azClient{
disksClient: disksClient,
interfacesClient: interfacesClient,
Expand All @@ -288,5 +418,6 @@ func newAzClient(cfg *Config, env *azure.Environment) (*azClient, error) {
virtualMachinesClient: virtualMachinesClient,
storageAccountsClient: storageAccountsClient,
skuClient: skuClient,
agentPoolClient: agentPoolClient,
}, nil
}
25 changes: 22 additions & 3 deletions cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package azure

import (
"fmt"
"io"
"os"
"strings"
Expand Down Expand Up @@ -122,9 +123,27 @@ func (azure *AzureCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovid
return azure.azureManager.GetNodeGroupForInstance(ref)
}

// HasInstance returns whether a given node has a corresponding instance in this cloud provider
func (azure *AzureCloudProvider) HasInstance(*apiv1.Node) (bool, error) {
return true, cloudprovider.ErrNotImplemented
// HasInstance returns whether a given node has a corresponding instance in this cloud provider.
//
// Used to prevent undercount of existing VMs (taint-based overcount of deleted VMs),
// and so should not return false, nil (no instance) if uncertain; return error instead.
// (Think "has instance for sure, else error".) Returning an error causes fallback to taint-based
// determination; use ErrNotImplemented for silent fallback, any other error will be logged.
//
// Expected behavior (should work for VMSS Uniform/Flex, and VMs):
// - exists : return true, nil
// - !exists : return *, ErrNotImplemented (could use custom error for autoscaled nodes)
// - unimplemented case : return *, ErrNotImplemented
// - any other error : return *, error
func (azure *AzureCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {
if node.Spec.ProviderID == "" {
return false, fmt.Errorf("ProviderID for node: %s is empty, skipped", node.Name)
}

if !strings.HasPrefix(node.Spec.ProviderID, "azure://") {
return false, fmt.Errorf("invalid azure ProviderID prefix for node: %s, skipped", node.Name)
}
return azure.azureManager.azureCache.HasInstance(node.Spec.ProviderID)
}

// Pricing returns pricing model for this cloud provider or error if not available.
Expand Down
Loading

0 comments on commit 7a46404

Please sign in to comment.