From cdd65b25f1dda734ad1420ce75440729ef1f63ea Mon Sep 17 00:00:00 2001 From: Steve Hipwell Date: Tue, 12 Apr 2022 15:27:27 +0100 Subject: [PATCH] feat: Add support for Azure user assigned identity Signed-off-by: Steve Hipwell --- docs/sources/configuration/_index.md | 31 ++++--- docs/sources/storage/_index.md | 8 +- go.mod | 2 +- pkg/loki/config_wrapper_test.go | 4 +- pkg/storage/bucket/azure/config.go | 2 +- pkg/storage/bucket/azure/config_test.go | 3 +- .../chunk/client/azure/blob_storage_client.go | 89 ++++++++++--------- 7 files changed, 76 insertions(+), 63 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 8eb1e5ce1f92..e495550aab11 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -761,19 +761,32 @@ The `azure_storage_config` configures Azure as a general storage for different d # CLI flag: -.azure.environment [environment: | default = "AzureGlobal"] -# Name of the blob container used to store chunks. This container must be -# created before running cortex. -# CLI flag: -.azure.container-name -[container_name: | default = "loki"] - -# The Microsoft Azure account name to be used +# Azure storage account name. # CLI flag: -.azure.account-name [account_name: | default = ""] -# The Microsoft Azure account key to use. +# Azure storage account key. # CLI flag: -.azure.account-key [account_key: | default = ""] +# Name of the storage account blob container used to store chunks. +# This container must be created before running Loki. +# CLI flag: -.azure.container-name +[container_name: | default = "loki"] + +# Azure storage endpoint suffix without schema. The storage account name will +# be prefixed to this value to create the FQDN. +# CLI flag: -.azure.endpoint-suffix +[endpoint_suffix: | default = ""] + +# Use Managed Identity to authenticate to the Azure storage account. +# CLI flag: -.azure.use-managed-identity +[use_managed_identity: | default = false] + +# User assigned identity ID to authenticate to the Azure storage account. +# CLI flag: -.azure.user-assigned-id +[user_assigned_id: | default = ""] + # Chunk delimiter to build the blobID # CLI flag: -.azure.chunk-delimiter [chunk_delimiter: | default = "-"] @@ -805,10 +818,6 @@ The `azure_storage_config` configures Azure as a general storage for different d # Maximum time to wait before retrying a request. # CLI flag: -.azure.max-retry-delay [max_retry_delay: | default = 500ms] - -# Use Managed Identity or not. -# CLI flag: -ruler.storage.azure.use-managed-identity -[use_managed_identity: | default = false] ``` ## gcs_storage_config diff --git a/docs/sources/storage/_index.md b/docs/sources/storage/_index.md index 924c2cc2103e..49ec0caca314 100644 --- a/docs/sources/storage/_index.md +++ b/docs/sources/storage/_index.md @@ -300,14 +300,16 @@ schema_config: store: boltdb-shipper storage_config: azure: + # Your Azure storage account name + account_name: # For the account-key, see docs: https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal account_key: - # Your azure account name - account_name: # See https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#containers container_name: - request_timeout: 0 use_managed_identity: + # Providing a user assigned ID will override use_managed_identity + user_assigned_id: + request_timeout: 0 boltdb_shipper: active_index_directory: /data/loki/boltdb-shipper-active cache_location: /data/loki/boltdb-shipper-cache diff --git a/go.mod b/go.mod index 78aaecdecd79..db886143a8eb 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/Azure/azure-pipeline-go v0.2.3 github.com/Azure/azure-storage-blob-go v0.13.0 github.com/Azure/go-autorest/autorest/adal v0.9.18 + github.com/Azure/go-autorest/autorest/azure/auth v0.5.8 github.com/Masterminds/sprig/v3 v3.2.2 github.com/NYTimes/gziphandler v1.1.1 github.com/Shopify/sarama v1.30.0 @@ -120,7 +121,6 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.11.24 // indirect - github.com/Azure/go-autorest/autorest/azure/auth v0.5.8 // indirect github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 49dbdbec6553..cbbec34f8b9f 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -360,8 +360,8 @@ memberlist: "should equal default environment since unspecified in config") assert.Equal(t, "milkyway", actual.ContainerName) - assert.Equal(t, "3rd_planet", actual.AccountName) - assert.Equal(t, "water", actual.AccountKey.String()) + assert.Equal(t, "3rd_planet", actual.StorageAccountName) + assert.Equal(t, "water", actual.StorageAccountKey.String()) assert.Equal(t, 27, actual.DownloadBufferSize) assert.Equal(t, 42, actual.UploadBufferSize) assert.Equal(t, 13, actual.UploadBufferCount) diff --git a/pkg/storage/bucket/azure/config.go b/pkg/storage/bucket/azure/config.go index 066d19703f1c..47ee6ea33673 100644 --- a/pkg/storage/bucket/azure/config.go +++ b/pkg/storage/bucket/azure/config.go @@ -28,7 +28,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name") f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key") - f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "", "Azure storage container name") + f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "loki", "Azure storage container name") f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN") f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors") cfg.Config.RegisterFlagsWithPrefix(prefix+"azure.", f) diff --git a/pkg/storage/bucket/azure/config_test.go b/pkg/storage/bucket/azure/config_test.go index ae9bd5ee19f6..5d3e40ed5436 100644 --- a/pkg/storage/bucket/azure/config_test.go +++ b/pkg/storage/bucket/azure/config_test.go @@ -13,7 +13,8 @@ import ( // defaultConfig should match the default flag values defined in RegisterFlagsWithPrefix. var defaultConfig = Config{ - MaxRetries: 20, + ContainerName: "loki", + MaxRetries: 20, Config: http.Config{ IdleConnTimeout: 90 * time.Second, ResponseHeaderTimeout: 2 * time.Minute, diff --git a/pkg/storage/chunk/client/azure/blob_storage_client.go b/pkg/storage/chunk/client/azure/blob_storage_client.go index e4fef88bcb63..5e2042fa0f37 100644 --- a/pkg/storage/chunk/client/azure/blob_storage_client.go +++ b/pkg/storage/chunk/client/azure/blob_storage_client.go @@ -15,6 +15,7 @@ import ( "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/go-autorest/autorest/adal" + "github.com/Azure/go-autorest/autorest/azure/auth" "github.com/grafana/dskit/flagext" "github.com/mattn/go-ieproxy" "github.com/prometheus/client_golang/prometheus" @@ -38,23 +39,12 @@ const ( var ( supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment} noClientKey = azblob.ClientProvidedKeyOptions{} - endpoints = map[string]struct{ blobURLFmt, containerURLFmt string }{ - azureGlobal: { - "https://%s.blob.core.windows.net/%s/%s", - "https://%s.blob.core.windows.net/%s", - }, - azureChinaCloud: { - "https://%s.blob.core.chinacloudapi.cn/%s/%s", - "https://%s.blob.core.chinacloudapi.cn/%s", - }, - azureGermanCloud: { - "https://%s.blob.core.cloudapi.de/%s/%s", - "https://%s.blob.core.cloudapi.de/%s", - }, - azureUSGovernment: { - "https://%s.blob.core.usgovcloudapi.net/%s/%s", - "https://%s.blob.core.usgovcloudapi.net/%s", - }, + + defaultEndpoints = map[string]string{ + azureGlobal: "blob.core.windows.net", + azureChinaCloud: "blob.core.chinacloudapi.cn", + azureGermanCloud: "blob.core.cloudapi.de", + azureUSGovernment: "blob.core.usgovcloudapi.net", } // default Azure http client. @@ -83,10 +73,13 @@ var ( // BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage. type BlobStorageConfig struct { Environment string `yaml:"environment"` + StorageAccountName string `yaml:"account_name"` + StorageAccountKey flagext.Secret `yaml:"account_key"` ContainerName string `yaml:"container_name"` - AccountName string `yaml:"account_name"` + Endpoint string `yaml:"endpoint_suffix"` + UseManagedIdentity bool `yaml:"use_managed_identity"` + UserAssignedID string `yaml:"user_assigned_id"` ChunkDelimiter string `yaml:"chunk_delimiter"` - AccountKey flagext.Secret `yaml:"account_key"` DownloadBufferSize int `yaml:"download_buffer_size"` UploadBufferSize int `yaml:"upload_buffer_size"` UploadBufferCount int `yaml:"upload_buffer_count"` @@ -94,7 +87,6 @@ type BlobStorageConfig struct { MaxRetries int `yaml:"max_retries"` MinRetryDelay time.Duration `yaml:"min_retry_delay"` MaxRetryDelay time.Duration `yaml:"max_retry_delay"` - UseManagedIdentity bool `yaml:"use_managed_identity"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -105,10 +97,13 @@ func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&c.Environment, prefix+"azure.environment", azureGlobal, fmt.Sprintf("Azure Cloud environment. Supported values are: %s.", strings.Join(supportedEnvironments, ", "))) - f.StringVar(&c.ContainerName, prefix+"azure.container-name", "loki", "Name of the blob container used to store chunks. This container must be created before running Loki.") - f.StringVar(&c.AccountName, prefix+"azure.account-name", "", "The Microsoft Azure account name to be used") + f.StringVar(&c.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name.") + f.Var(&c.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key.") + f.StringVar(&c.ContainerName, prefix+"azure.container-name", "loki", "Name of the storage account blob container used to store chunks. This container must be created before running cortex.") + f.StringVar(&c.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The storage account name will be prefixed to this value to create the FQDN.") + f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity to authenticate to the Azure storage account.") + f.StringVar(&c.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned identity ID to authenticate to the Azure storage account.") f.StringVar(&c.ChunkDelimiter, prefix+"azure.chunk-delimiter", "-", "Chunk delimiter for blob ID to be used") - f.Var(&c.AccountKey, prefix+"azure.account-key", "The Microsoft Azure account key to use.") f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage.") f.IntVar(&c.DownloadBufferSize, prefix+"azure.download-buffer-size", 512000, "Preallocated buffer size for downloads.") f.IntVar(&c.UploadBufferSize, prefix+"azure.upload-buffer-size", 256000, "Preallocated buffer size for uploads.") @@ -116,7 +111,6 @@ func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.") f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.") f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.") - f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity or not.") } type BlobStorageMetrics struct { @@ -256,7 +250,7 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU blobID = strings.Replace(blobID, ":", b.cfg.ChunkDelimiter, -1) // generate url for new chunk blob - u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.AccountName, b.cfg.ContainerName, blobID)) + u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.StorageAccountName, b.cfg.ContainerName, blobID)) if err != nil { return azblob.BlockBlobURL{}, err } @@ -269,7 +263,7 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU } func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) { - u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.AccountName, b.cfg.ContainerName)) + u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.StorageAccountName, b.cfg.ContainerName)) if err != nil { return azblob.ContainerURL{}, err } @@ -289,11 +283,6 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe }, } - credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.String()) - if err != nil { - return nil, err - } - client := defaultClientFactory() opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { @@ -316,7 +305,12 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe }) } - if !b.cfg.UseManagedIdentity { + if !b.cfg.UseManagedIdentity && b.cfg.UserAssignedID == "" { + credential, err := azblob.NewSharedKeyCredential(b.cfg.StorageAccountName, b.cfg.StorageAccountKey.String()) + if err != nil { + return nil, err + } + return azblob.NewPipeline(credential, opts), nil } @@ -329,7 +323,7 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe } func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) { - spt, err := b.fetchMSIToken() + spt, err := b.getServicePrincipalToken() if err != nil { return nil, err } @@ -357,18 +351,25 @@ func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) { return &tc, nil } -func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) { - // msiEndpoint is the well known endpoint for getting MSI authentications tokens - // msiEndpoint := "http://169.254.169.254/metadata/identity/oauth2/token" for production Jobs - msiEndpoint, _ := adal.GetMSIVMEndpoint() +func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, error) { + var endpoint string + if b.cfg.Endpoint != "" { + endpoint = b.cfg.Endpoint + } else { + endpoint = defaultEndpoints[b.cfg.Environment] + } + + resource := fmt.Sprintf("https://%s.%s", b.cfg.StorageAccountName, endpoint) - // both can be empty, systemAssignedMSI scenario - spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/") - if err != nil { - return nil, err + msiConfig := auth.MSIConfig{ + Resource: resource, + } + + if b.cfg.UserAssignedID != "" { + msiConfig.ClientID = b.cfg.UserAssignedID } - return spt, spt.Refresh() + return msiConfig.ServicePrincipalToken() } // List implements chunk.ObjectClient. @@ -434,11 +435,11 @@ func (c *BlobStorageConfig) Validate() error { } func (b *BlobStorage) selectBlobURLFmt() string { - return endpoints[b.cfg.Environment].blobURLFmt + return fmt.Sprintf("https://%%s.%s/%%s/%%s", defaultEndpoints[b.cfg.Environment]) } func (b *BlobStorage) selectContainerURLFmt() string { - return endpoints[b.cfg.Environment].containerURLFmt + return fmt.Sprintf("https://%%s.%s/%%s", defaultEndpoints[b.cfg.Environment]) } // IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.