Skip to content

Commit

Permalink
feat: Add support for Azure user assigned identity (#5891)
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Hipwell <steve.hipwell@gmail.com>
  • Loading branch information
stevehipwell committed May 30, 2022
1 parent 0cb3994 commit 7cc6aa4
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 63 deletions.
31 changes: 20 additions & 11 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -794,19 +794,32 @@ The `azure_storage_config` configures Azure as a general storage for different d
# CLI flag: -<prefix>.azure.environment
[environment: <string> | default = "AzureGlobal"]
# Name of the blob container used to store chunks. This container must be
# created before running cortex.
# CLI flag: -<prefix>.azure.container-name
[container_name: <string> | default = "loki"]
# The Microsoft Azure account name to be used
# Azure storage account name.
# CLI flag: -<prefix>.azure.account-name
[account_name: <string> | default = ""]
# The Microsoft Azure account key to use.
# Azure storage account key.
# CLI flag: -<prefix>.azure.account-key
[account_key: <string> | default = ""]
# Name of the storage account blob container used to store chunks.
# This container must be created before running Loki.
# CLI flag: -<prefix>.azure.container-name
[container_name: <string> | default = "loki"]
# Azure storage endpoint suffix without schema. The storage account name will
# be prefixed to this value to create the FQDN.
# CLI flag: -<prefix>.azure.endpoint-suffix
[endpoint_suffix: <string> | default = ""]
# Use Managed Identity to authenticate to the Azure storage account.
# CLI flag: -<prefix>.azure.use-managed-identity
[use_managed_identity: <boolean> | default = false]
# User assigned identity ID to authenticate to the Azure storage account.
# CLI flag: -<prefix>.azure.user-assigned-id
[user_assigned_id: <string> | default = ""]
# Chunk delimiter to build the blobID
# CLI flag: -<prefix>.azure.chunk-delimiter
[chunk_delimiter: <string> | default = "-"]
Expand Down Expand Up @@ -838,10 +851,6 @@ The `azure_storage_config` configures Azure as a general storage for different d
# Maximum time to wait before retrying a request.
# CLI flag: -<prefix>.azure.max-retry-delay
[max_retry_delay: <duration> | default = 500ms]
# Use Managed Identity or not.
# CLI flag: -ruler.storage.azure.use-managed-identity
[use_managed_identity: <boolean> | default = false]
```

## gcs_storage_config
Expand Down
8 changes: 5 additions & 3 deletions docs/sources/storage/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,16 @@ schema_config:
store: boltdb-shipper
storage_config:
azure:
# Your Azure storage account name
account_name: <account-name>
# For the account-key, see docs: https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal
account_key: <account-key>
# Your azure account name
account_name: <account-name>
# See https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#containers
container_name: <container-name>
request_timeout: 0
use_managed_identity: <true|false>
# Providing a user assigned ID will override use_managed_identity
user_assigned_id: <user-assigned-identity-id>
request_timeout: 0
boltdb_shipper:
active_index_directory: /data/loki/boltdb-shipper-active
cache_location: /data/loki/boltdb-shipper-cache
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.13.0
github.com/Azure/go-autorest/autorest/adal v0.9.18
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/Masterminds/sprig/v3 v3.2.2
github.com/NYTimes/gziphandler v1.1.1
github.com/Shopify/sarama v1.30.0
Expand Down Expand Up @@ -120,7 +121,6 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.24 // indirect
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8 // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ memberlist:
"should equal default environment since unspecified in config")

assert.Equal(t, "milkyway", actual.ContainerName)
assert.Equal(t, "3rd_planet", actual.AccountName)
assert.Equal(t, "water", actual.AccountKey.String())
assert.Equal(t, "3rd_planet", actual.StorageAccountName)
assert.Equal(t, "water", actual.StorageAccountKey.String())
assert.Equal(t, 27, actual.DownloadBufferSize)
assert.Equal(t, 42, actual.UploadBufferSize)
assert.Equal(t, 13, actual.UploadBufferCount)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/azure/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "", "Azure storage container name")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "loki", "Azure storage container name")
f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN")
f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors")
cfg.Config.RegisterFlagsWithPrefix(prefix+"azure.", f)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bucket/azure/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

// defaultConfig should match the default flag values defined in RegisterFlagsWithPrefix.
var defaultConfig = Config{
MaxRetries: 20,
ContainerName: "loki",
MaxRetries: 20,
Config: http.Config{
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 2 * time.Minute,
Expand Down
89 changes: 45 additions & 44 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/grafana/dskit/flagext"
"github.com/mattn/go-ieproxy"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -38,23 +39,12 @@ const (
var (
supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment}
noClientKey = azblob.ClientProvidedKeyOptions{}
endpoints = map[string]struct{ blobURLFmt, containerURLFmt string }{
azureGlobal: {
"https://%s.blob.core.windows.net/%s/%s",
"https://%s.blob.core.windows.net/%s",
},
azureChinaCloud: {
"https://%s.blob.core.chinacloudapi.cn/%s/%s",
"https://%s.blob.core.chinacloudapi.cn/%s",
},
azureGermanCloud: {
"https://%s.blob.core.cloudapi.de/%s/%s",
"https://%s.blob.core.cloudapi.de/%s",
},
azureUSGovernment: {
"https://%s.blob.core.usgovcloudapi.net/%s/%s",
"https://%s.blob.core.usgovcloudapi.net/%s",
},

defaultEndpoints = map[string]string{
azureGlobal: "blob.core.windows.net",
azureChinaCloud: "blob.core.chinacloudapi.cn",
azureGermanCloud: "blob.core.cloudapi.de",
azureUSGovernment: "blob.core.usgovcloudapi.net",
}

// default Azure http client.
Expand Down Expand Up @@ -83,18 +73,20 @@ var (
// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage.
type BlobStorageConfig struct {
Environment string `yaml:"environment"`
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
ContainerName string `yaml:"container_name"`
AccountName string `yaml:"account_name"`
Endpoint string `yaml:"endpoint_suffix"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
UserAssignedID string `yaml:"user_assigned_id"`
ChunkDelimiter string `yaml:"chunk_delimiter"`
AccountKey flagext.Secret `yaml:"account_key"`
DownloadBufferSize int `yaml:"download_buffer_size"`
UploadBufferSize int `yaml:"upload_buffer_size"`
UploadBufferCount int `yaml:"upload_buffer_count"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MaxRetries int `yaml:"max_retries"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -105,18 +97,20 @@ func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.Environment, prefix+"azure.environment", azureGlobal, fmt.Sprintf("Azure Cloud environment. Supported values are: %s.", strings.Join(supportedEnvironments, ", ")))
f.StringVar(&c.ContainerName, prefix+"azure.container-name", "loki", "Name of the blob container used to store chunks. This container must be created before running Loki.")
f.StringVar(&c.AccountName, prefix+"azure.account-name", "", "The Microsoft Azure account name to be used")
f.StringVar(&c.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name.")
f.Var(&c.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key.")
f.StringVar(&c.ContainerName, prefix+"azure.container-name", "loki", "Name of the storage account blob container used to store chunks. This container must be created before running cortex.")
f.StringVar(&c.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The storage account name will be prefixed to this value to create the FQDN.")
f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity to authenticate to the Azure storage account.")
f.StringVar(&c.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned identity ID to authenticate to the Azure storage account.")
f.StringVar(&c.ChunkDelimiter, prefix+"azure.chunk-delimiter", "-", "Chunk delimiter for blob ID to be used")
f.Var(&c.AccountKey, prefix+"azure.account-key", "The Microsoft Azure account key to use.")
f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage.")
f.IntVar(&c.DownloadBufferSize, prefix+"azure.download-buffer-size", 512000, "Preallocated buffer size for downloads.")
f.IntVar(&c.UploadBufferSize, prefix+"azure.upload-buffer-size", 256000, "Preallocated buffer size for uploads.")
f.IntVar(&c.UploadBufferCount, prefix+"azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk.")
f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.")
f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.")
f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity or not.")
}

type BlobStorageMetrics struct {
Expand Down Expand Up @@ -256,7 +250,7 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU
blobID = strings.Replace(blobID, ":", b.cfg.ChunkDelimiter, -1)

// generate url for new chunk blob
u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.AccountName, b.cfg.ContainerName, blobID))
u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.StorageAccountName, b.cfg.ContainerName, blobID))
if err != nil {
return azblob.BlockBlobURL{}, err
}
Expand All @@ -269,7 +263,7 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU
}

func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.AccountName, b.cfg.ContainerName))
u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.StorageAccountName, b.cfg.ContainerName))
if err != nil {
return azblob.ContainerURL{}, err
}
Expand All @@ -289,11 +283,6 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
},
}

credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.String())
if err != nil {
return nil, err
}

client := defaultClientFactory()

opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
Expand All @@ -316,7 +305,12 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
})
}

if !b.cfg.UseManagedIdentity {
if !b.cfg.UseManagedIdentity && b.cfg.UserAssignedID == "" {
credential, err := azblob.NewSharedKeyCredential(b.cfg.StorageAccountName, b.cfg.StorageAccountKey.String())
if err != nil {
return nil, err
}

return azblob.NewPipeline(credential, opts), nil
}

Expand All @@ -329,7 +323,7 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
}

func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
spt, err := b.fetchMSIToken()
spt, err := b.getServicePrincipalToken()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -357,18 +351,25 @@ func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
return &tc, nil
}

func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) {
// msiEndpoint is the well known endpoint for getting MSI authentications tokens
// msiEndpoint := "http://169.254.169.254/metadata/identity/oauth2/token" for production Jobs
msiEndpoint, _ := adal.GetMSIVMEndpoint()
func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, error) {
var endpoint string
if b.cfg.Endpoint != "" {
endpoint = b.cfg.Endpoint
} else {
endpoint = defaultEndpoints[b.cfg.Environment]
}

resource := fmt.Sprintf("https://%s.%s", b.cfg.StorageAccountName, endpoint)

// both can be empty, systemAssignedMSI scenario
spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/")
if err != nil {
return nil, err
msiConfig := auth.MSIConfig{
Resource: resource,
}

if b.cfg.UserAssignedID != "" {
msiConfig.ClientID = b.cfg.UserAssignedID
}

return spt, spt.Refresh()
return msiConfig.ServicePrincipalToken()
}

// List implements chunk.ObjectClient.
Expand Down Expand Up @@ -434,11 +435,11 @@ func (c *BlobStorageConfig) Validate() error {
}

func (b *BlobStorage) selectBlobURLFmt() string {
return endpoints[b.cfg.Environment].blobURLFmt
return fmt.Sprintf("https://%%s.%s/%%s/%%s", defaultEndpoints[b.cfg.Environment])
}

func (b *BlobStorage) selectContainerURLFmt() string {
return endpoints[b.cfg.Environment].containerURLFmt
return fmt.Sprintf("https://%%s.%s/%%s", defaultEndpoints[b.cfg.Environment])
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
Expand Down

0 comments on commit 7cc6aa4

Please sign in to comment.