Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VMs pool] Add implementation for VMs pool #6951

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
89 changes: 62 additions & 27 deletions cluster-autoscaler/cloudprovider/azure/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package azure

import (
"context"
"fmt"
"reflect"
"regexp"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v5"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute"
"github.com/Azure/go-autorest/autorest/to"
"github.com/Azure/skewer"
Expand Down Expand Up @@ -66,12 +68,17 @@ type azureCache struct {
// Cache content.

// resourceGroup specifies the name of the resource group that this cache tracks
resourceGroup string
resourceGroup string
clusterResourceGroup string
clusterName string

// enableVMsAgentPool specifies whether VMs agent pool type is supported.
enableVMsAgentPool bool

// vmType can be one of vmTypeVMSS (default), vmTypeStandard
vmType string

vmsPoolSet map[string]struct{} // track the nodepools that're vms pool
vmsPoolMap map[string]armcontainerservice.AgentPool // track the nodepools that're vms pool

// scaleSets keeps the set of all known scalesets in the resource group, populated/refreshed via VMSS.List() call.
// It is only used/populated if vmType is vmTypeVMSS (default).
Expand Down Expand Up @@ -104,8 +111,11 @@ func newAzureCache(client *azClient, cacheTTL time.Duration, config Config) (*az
azClient: client,
refreshInterval: cacheTTL,
resourceGroup: config.ResourceGroup,
clusterResourceGroup: config.ClusterResourceGroup,
clusterName: config.ClusterName,
enableVMsAgentPool: config.EnableVMsAgentPool,
vmType: config.VMType,
vmsPoolSet: make(map[string]struct{}),
vmsPoolMap: make(map[string]armcontainerservice.AgentPool),
scaleSets: make(map[string]compute.VirtualMachineScaleSet),
virtualMachines: make(map[string][]compute.VirtualMachine),
registeredNodeGroups: make([]cloudprovider.NodeGroup, 0),
Expand All @@ -126,11 +136,11 @@ func newAzureCache(client *azClient, cacheTTL time.Duration, config Config) (*az
return cache, nil
}

func (m *azureCache) getVMsPoolSet() map[string]struct{} {
func (m *azureCache) getVMsPoolMap() map[string]armcontainerservice.AgentPool {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.vmsPoolSet
return m.vmsPoolMap
}

func (m *azureCache) getVirtualMachines() map[string][]compute.VirtualMachine {
Expand Down Expand Up @@ -226,13 +236,19 @@ func (m *azureCache) fetchAzureResources() error {
return err
}
m.scaleSets = vmssResult
vmResult, vmsPoolSet, err := m.fetchVirtualMachines()
// we fetch both sets of resources since CAS may operate on mixed nodepools
vmResult, err := m.fetchVirtualMachines()
if err != nil {
return err
}
// we fetch both sets of resources since CAS may operate on mixed nodepools
m.virtualMachines = vmResult
m.vmsPoolSet = vmsPoolSet
if m.enableVMsAgentPool {
vmsPoolMap, err := m.fetchVMsPools()
if err != nil {
return err
}
m.vmsPoolMap = vmsPoolMap
}

return nil
}
Expand All @@ -245,19 +261,17 @@ const (
)

// fetchVirtualMachines returns the updated list of virtual machines in the config resource group using the Azure API.
func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine, map[string]struct{}, error) {
func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine, error) {
ctx, cancel := getContextWithCancel()
defer cancel()

result, err := m.azClient.virtualMachinesClient.List(ctx, m.resourceGroup)
if err != nil {
klog.Errorf("VirtualMachinesClient.List in resource group %q failed: %v", m.resourceGroup, err)
return nil, nil, err.Error()
return nil, err.Error()
}

instances := make(map[string][]compute.VirtualMachine)
// track the nodepools that're vms pools
vmsPoolSet := make(map[string]struct{})
for _, instance := range result {
if instance.Tags == nil {
continue
Expand All @@ -274,20 +288,8 @@ func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine
}

instances[to.String(vmPoolName)] = append(instances[to.String(vmPoolName)], instance)

// if the nodepool is already in the map, skip it
if _, ok := vmsPoolSet[to.String(vmPoolName)]; ok {
continue
}

// nodes from vms pool will have tag "aks-managed-agentpool-type" set to "VirtualMachines"
if agentpoolType := tags[agentpoolTypeTag]; agentpoolType != nil {
if strings.EqualFold(to.String(agentpoolType), vmsPoolType) {
vmsPoolSet[to.String(vmPoolName)] = struct{}{}
}
}
}
return instances, vmsPoolSet, nil
return instances, nil
}

// fetchScaleSets returns the updated list of scale sets in the config resource group using the Azure API.
Expand All @@ -308,6 +310,39 @@ func (m *azureCache) fetchScaleSets() (map[string]compute.VirtualMachineScaleSet
return sets, nil
}

// fetchVMsPools returns the a set of VMs pools in the cluster
func (m *azureCache) fetchVMsPools() (map[string]armcontainerservice.AgentPool, error) {
ctx, cancel := getContextWithTimeout(vmsListRequestContextTimeout)
defer cancel()

if m.azClient.agentPoolClient == nil {
return nil, fmt.Errorf("agentPoolClient is nil")
}

vmsPoolMap := make(map[string]armcontainerservice.AgentPool)

pager := m.azClient.agentPoolClient.NewListPager(m.clusterResourceGroup, m.clusterName, nil)
var aps []*armcontainerservice.AgentPool
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
klog.Errorf("agentPoolClient.pager.NextPage in cluster %s resource group %s failed: %v",
m.clusterName, m.clusterResourceGroup, err)
return nil, err
}
aps = append(aps, resp.Value...)
}

for _, ap := range aps {
if ap != nil && ap.Name != nil && ap.Properties != nil && ap.Properties.Type != nil &&
*ap.Properties.Type == armcontainerservice.AgentPoolTypeVirtualMachines {
vmsPoolMap[*ap.Name] = *ap
}
}

return vmsPoolMap, nil
}

// Register registers a node group if it hasn't been registered.
func (m *azureCache) Register(nodeGroup cloudprovider.NodeGroup) bool {
m.mutex.Lock()
Expand Down Expand Up @@ -417,7 +452,7 @@ func (m *azureCache) HasInstance(providerID string) (bool, error) {

// FindForInstance returns node group of the given Instance
func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) {
vmsPoolSet := m.getVMsPoolSet()
vmsPoolMap := m.getVMsPoolMap()
m.mutex.Lock()
defer m.mutex.Unlock()

Expand All @@ -436,7 +471,7 @@ func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudpr
}

// cluster with vmss pool only
if vmType == vmTypeVMSS && len(vmsPoolSet) == 0 {
if vmType == vmTypeVMSS && len(vmsPoolMap) == 0 {
if m.areAllScaleSetsUniform() {
// Omit virtual machines not managed by vmss only in case of uniform scale set.
if ok := virtualMachineRE.Match([]byte(inst.Name)); ok {
Expand Down
34 changes: 34 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package azure
import (
"testing"

"go.uber.org/mock/gomock"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v5"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -70,3 +72,35 @@ func TestFindForInstance(t *testing.T) {
assert.NoError(t, err)
assert.True(t, ac.unownedInstances[inst])
}

func TestFetchVMsPools(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

provider := newTestProvider(t)
ac := provider.azureManager.azureCache
mockAgentpoolclient := NewMockAgentPoolsClient(ctrl)
ac.azClient.agentPoolClient = mockAgentpoolclient

vmsPoolName := "vmspool1"
vmsPool := getTestVMsAgentPool(vmsPoolName, false)
vmssPoolName := "vmsspool1"
vmssPoolType := armcontainerservice.AgentPoolTypeVirtualMachineScaleSets
vmssPool := armcontainerservice.AgentPool{
Name: &vmssPoolName,
Properties: &armcontainerservice.ManagedClusterAgentPoolProfileProperties{
Type: &vmssPoolType,
},
}
invalidPool := armcontainerservice.AgentPool{}
fakeAPListPager := getFakeAgentpoolListPager(&vmsPool, &vmssPool, &invalidPool)
mockAgentpoolclient.EXPECT().NewListPager(gomock.Any(), gomock.Any(), nil).
Return(fakeAPListPager)

vmsPoolMap, err := ac.fetchVMsPools()
assert.NoError(t, err)
assert.Equal(t, 1, len(vmsPoolMap))

_, ok := vmsPoolMap[vmsPoolName]
assert.True(t, ok)
}
66 changes: 37 additions & 29 deletions cluster-autoscaler/cloudprovider/azure/azure_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
azurecore_policy "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v5"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources"
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-02-01/storage"
Expand Down Expand Up @@ -148,7 +148,7 @@ func (az *azDeploymentsClient) Delete(ctx context.Context, resourceGroupName, de
return future.Response(), err
}

//go:generate sh -c "mockgen k8s.io/autoscaler/cluster-autoscaler/cloudprovider/azure AgentPoolsClient >./agentpool_client.go"
//go:generate sh -c "mockgen -source=azure_client.go -destination azure_mock_agentpool_client.go -package azure -exclude_interfaces DeploymentsClient"

// AgentPoolsClient interface defines the methods needed for scaling vms pool.
// it is implemented by track2 sdk armcontainerservice.AgentPoolsClient
Expand All @@ -169,41 +169,47 @@ type AgentPoolsClient interface {
machines armcontainerservice.AgentPoolDeleteMachinesParameter,
options *armcontainerservice.AgentPoolsClientBeginDeleteMachinesOptions) (
*runtime.Poller[armcontainerservice.AgentPoolsClientDeleteMachinesResponse], error)
NewListPager(
resourceGroupName, resourceName string,
options *armcontainerservice.AgentPoolsClientListOptions,
) *runtime.Pager[armcontainerservice.AgentPoolsClientListResponse]
}

func getAgentpoolClientCredentials(cfg *Config) (azcore.TokenCredential, error) {
var cred azcore.TokenCredential
var err error
if cfg.AuthMethod == authMethodCLI {
cred, err = azidentity.NewAzureCLICredential(&azidentity.AzureCLICredentialOptions{
TenantID: cfg.TenantID})
if err != nil {
klog.Errorf("NewAzureCLICredential failed: %v", err)
return nil, err
if cfg.AuthMethod == "" || cfg.AuthMethod == authMethodPrincipal {
// Use MSI
if cfg.UseManagedIdentityExtension {
// Use System Assigned MSI
if len(cfg.UserAssignedIdentityID) == 0 {
klog.V(4).Info("Agentpool client: using System Assigned MSI to retrieve access token")
return azidentity.NewManagedIdentityCredential(nil)
}
// Use User Assigned MSI
klog.V(4).Info("Agentpool client: using User Assigned MSI to retrieve access token")
return azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{
ID: azidentity.ClientID(cfg.UserAssignedIdentityID),
})
}
} else if cfg.AuthMethod == "" || cfg.AuthMethod == authMethodPrincipal {
cred, err = azidentity.NewClientSecretCredential(cfg.TenantID, cfg.AADClientID, cfg.AADClientSecret, nil)
if err != nil {
klog.Errorf("NewClientSecretCredential failed: %v", err)
return nil, err

// Use Service Principal
if len(cfg.AADClientID) > 0 && len(cfg.AADClientSecret) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this support UseWorkloadIdentityExtension?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure...but it should be fairly easy to add it if needed later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't be used for managed, but it is the preferred method of authentication for self-hosted at the moment. I would recommend you add it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding to what robin said, the workload identity extension gets addressed in this function below: https://github.com/kubernetes/autoscaler/pull/6951/files#diff-e18278caa19fbc44db54dcd3c6d8e77d045fff33e3a7cd0b6a65f5a111543b83R288-R354

so I'd look into reusing this function. we use workload identity for local testing + dev

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to support UseWorkloadIdentityExtension

klog.V(2).Infoln("Agentpool client: using client_id+client_secret to retrieve access token")
return azidentity.NewClientSecretCredential(cfg.TenantID, cfg.AADClientID, cfg.AADClientSecret, nil)
}
} else {
return nil, fmt.Errorf("unsupported authorization method: %s", cfg.AuthMethod)
}
return cred, nil
}

func getAgentpoolClientRetryOptions(cfg *Config) azurecore_policy.RetryOptions {
if cfg.AuthMethod == authMethodCLI {
return azurecore_policy.RetryOptions{
MaxRetries: -1, // no retry when using CLI auth for UT
}
if cfg.UseWorkloadIdentityExtension {
klog.V(4).Info("Agentpool client: using workload identity for access token")
return azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{
TokenFilePath: cfg.AADFederatedTokenFile,
})
}
return azextensions.DefaultRetryOpts()

return nil, fmt.Errorf("unsupported authorization method: %s", cfg.AuthMethod)
}

func newAgentpoolClient(cfg *Config) (AgentPoolsClient, error) {
retryOptions := getAgentpoolClientRetryOptions(cfg)
retryOptions := azextensions.DefaultRetryOpts()

if cfg.ARMBaseURLForAPClient != "" {
klog.V(10).Infof("Using ARMBaseURLForAPClient to create agent pool client")
Expand Down Expand Up @@ -404,9 +410,11 @@ func newAzClient(cfg *Config, env *azure.Environment) (*azClient, error) {

agentPoolClient, err := newAgentpoolClient(cfg)
if err != nil {
// we don't want to fail the whole process so we don't break any existing functionality
// since this may not be fatal - it is only used by vms pool which is still under development.
klog.Warningf("newAgentpoolClient failed with error: %s", err)
klog.Errorf("newAgentpoolClient failed with error: %s", err)
if cfg.EnableVMsAgentPool {
// only return error if VMs agent pool is supported
return nil, err
}
}

return &azClient{
Expand Down
Loading
Loading