Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Azure storage account user assigned identity #5891

Merged
merged 1 commit into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -761,19 +761,32 @@ The `azure_storage_config` configures Azure as a general storage for different d
# CLI flag: -<prefix>.azure.environment
[environment: <string> | default = "AzureGlobal"]

# Name of the blob container used to store chunks. This container must be
# created before running cortex.
# CLI flag: -<prefix>.azure.container-name
[container_name: <string> | default = "loki"]

# The Microsoft Azure account name to be used
# Azure storage account name.
# CLI flag: -<prefix>.azure.account-name
[account_name: <string> | default = ""]

# The Microsoft Azure account key to use.
# Azure storage account key.
# CLI flag: -<prefix>.azure.account-key
[account_key: <string> | default = ""]

# Name of the storage account blob container used to store chunks.
# This container must be created before running Loki.
# CLI flag: -<prefix>.azure.container-name
[container_name: <string> | default = "loki"]

# Azure storage endpoint suffix without schema. The storage account name will
# be prefixed to this value to create the FQDN.
# CLI flag: -<prefix>.azure.endpoint-suffix
[endpoint_suffix: <string> | default = ""]

# Use Managed Identity to authenticate to the Azure storage account.
# CLI flag: -<prefix>.azure.use-managed-identity
[use_managed_identity: <boolean> | default = false]

# User assigned identity ID to authenticate to the Azure storage account.
# CLI flag: -<prefix>.azure.user-assigned-id
[user_assigned_id: <string> | default = ""]

# Chunk delimiter to build the blobID
# CLI flag: -<prefix>.azure.chunk-delimiter
[chunk_delimiter: <string> | default = "-"]
Expand Down Expand Up @@ -805,10 +818,6 @@ The `azure_storage_config` configures Azure as a general storage for different d
# Maximum time to wait before retrying a request.
# CLI flag: -<prefix>.azure.max-retry-delay
[max_retry_delay: <duration> | default = 500ms]

# Use Managed Identity or not.
# CLI flag: -ruler.storage.azure.use-managed-identity
[use_managed_identity: <boolean> | default = false]
```

## gcs_storage_config
Expand Down
8 changes: 5 additions & 3 deletions docs/sources/storage/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,16 @@ schema_config:
store: boltdb-shipper
storage_config:
azure:
# Your Azure storage account name
account_name: <account-name>
# For the account-key, see docs: https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal
account_key: <account-key>
# Your azure account name
account_name: <account-name>
# See https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#containers
container_name: <container-name>
request_timeout: 0
use_managed_identity: <true|false>
# Providing a user assigned ID will override use_managed_identity
user_assigned_id: <user-assigned-identity-id>
request_timeout: 0
boltdb_shipper:
active_index_directory: /data/loki/boltdb-shipper-active
cache_location: /data/loki/boltdb-shipper-cache
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.13.0
github.com/Azure/go-autorest/autorest/adal v0.9.18
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/Masterminds/sprig/v3 v3.2.2
github.com/NYTimes/gziphandler v1.1.1
github.com/Shopify/sarama v1.30.0
Expand Down Expand Up @@ -120,7 +121,6 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.24 // indirect
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8 // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ memberlist:
"should equal default environment since unspecified in config")

assert.Equal(t, "milkyway", actual.ContainerName)
assert.Equal(t, "3rd_planet", actual.AccountName)
assert.Equal(t, "water", actual.AccountKey.String())
assert.Equal(t, "3rd_planet", actual.StorageAccountName)
assert.Equal(t, "water", actual.StorageAccountKey.String())
assert.Equal(t, 27, actual.DownloadBufferSize)
assert.Equal(t, 42, actual.UploadBufferSize)
assert.Equal(t, 13, actual.UploadBufferCount)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/azure/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name")
f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "", "Azure storage container name")
f.StringVar(&cfg.ContainerName, prefix+"azure.container-name", "loki", "Azure storage container name")
f.StringVar(&cfg.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The account name will be prefixed to this value to create the FQDN")
f.IntVar(&cfg.MaxRetries, prefix+"azure.max-retries", 20, "Number of retries for recoverable errors")
cfg.Config.RegisterFlagsWithPrefix(prefix+"azure.", f)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bucket/azure/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

// defaultConfig should match the default flag values defined in RegisterFlagsWithPrefix.
var defaultConfig = Config{
MaxRetries: 20,
ContainerName: "loki",
MaxRetries: 20,
Config: http.Config{
IdleConnTimeout: 90 * time.Second,
ResponseHeaderTimeout: 2 * time.Minute,
Expand Down
89 changes: 45 additions & 44 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/grafana/dskit/flagext"
"github.com/mattn/go-ieproxy"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -38,23 +39,12 @@ const (
var (
supportedEnvironments = []string{azureGlobal, azureChinaCloud, azureGermanCloud, azureUSGovernment}
noClientKey = azblob.ClientProvidedKeyOptions{}
endpoints = map[string]struct{ blobURLFmt, containerURLFmt string }{
azureGlobal: {
"https://%s.blob.core.windows.net/%s/%s",
"https://%s.blob.core.windows.net/%s",
},
azureChinaCloud: {
"https://%s.blob.core.chinacloudapi.cn/%s/%s",
"https://%s.blob.core.chinacloudapi.cn/%s",
},
azureGermanCloud: {
"https://%s.blob.core.cloudapi.de/%s/%s",
"https://%s.blob.core.cloudapi.de/%s",
},
azureUSGovernment: {
"https://%s.blob.core.usgovcloudapi.net/%s/%s",
"https://%s.blob.core.usgovcloudapi.net/%s",
},

defaultEndpoints = map[string]string{
azureGlobal: "blob.core.windows.net",
azureChinaCloud: "blob.core.chinacloudapi.cn",
azureGermanCloud: "blob.core.cloudapi.de",
azureUSGovernment: "blob.core.usgovcloudapi.net",
}

// default Azure http client.
Expand Down Expand Up @@ -83,18 +73,20 @@ var (
// BlobStorageConfig defines the configurable flags that can be defined when using azure blob storage.
type BlobStorageConfig struct {
Environment string `yaml:"environment"`
StorageAccountName string `yaml:"account_name"`
StorageAccountKey flagext.Secret `yaml:"account_key"`
ContainerName string `yaml:"container_name"`
AccountName string `yaml:"account_name"`
Endpoint string `yaml:"endpoint_suffix"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
UserAssignedID string `yaml:"user_assigned_id"`
ChunkDelimiter string `yaml:"chunk_delimiter"`
AccountKey flagext.Secret `yaml:"account_key"`
DownloadBufferSize int `yaml:"download_buffer_size"`
UploadBufferSize int `yaml:"upload_buffer_size"`
UploadBufferCount int `yaml:"upload_buffer_count"`
RequestTimeout time.Duration `yaml:"request_timeout"`
MaxRetries int `yaml:"max_retries"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -105,18 +97,20 @@ func (c *BlobStorageConfig) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.Environment, prefix+"azure.environment", azureGlobal, fmt.Sprintf("Azure Cloud environment. Supported values are: %s.", strings.Join(supportedEnvironments, ", ")))
f.StringVar(&c.ContainerName, prefix+"azure.container-name", "loki", "Name of the blob container used to store chunks. This container must be created before running Loki.")
f.StringVar(&c.AccountName, prefix+"azure.account-name", "", "The Microsoft Azure account name to be used")
f.StringVar(&c.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name.")
f.Var(&c.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key.")
f.StringVar(&c.ContainerName, prefix+"azure.container-name", "loki", "Name of the storage account blob container used to store chunks. This container must be created before running cortex.")
f.StringVar(&c.Endpoint, prefix+"azure.endpoint-suffix", "", "Azure storage endpoint suffix without schema. The storage account name will be prefixed to this value to create the FQDN.")
f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity to authenticate to the Azure storage account.")
f.StringVar(&c.UserAssignedID, prefix+"azure.user-assigned-id", "", "User assigned identity ID to authenticate to the Azure storage account.")
f.StringVar(&c.ChunkDelimiter, prefix+"azure.chunk-delimiter", "-", "Chunk delimiter for blob ID to be used")
f.Var(&c.AccountKey, prefix+"azure.account-key", "The Microsoft Azure account key to use.")
f.DurationVar(&c.RequestTimeout, prefix+"azure.request-timeout", 30*time.Second, "Timeout for requests made against azure blob storage.")
f.IntVar(&c.DownloadBufferSize, prefix+"azure.download-buffer-size", 512000, "Preallocated buffer size for downloads.")
f.IntVar(&c.UploadBufferSize, prefix+"azure.upload-buffer-size", 256000, "Preallocated buffer size for uploads.")
f.IntVar(&c.UploadBufferCount, prefix+"azure.download-buffer-count", 1, "Number of buffers used to used to upload a chunk.")
f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.")
f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.")
f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity or not.")
}

type BlobStorageMetrics struct {
Expand Down Expand Up @@ -256,7 +250,7 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU
blobID = strings.Replace(blobID, ":", b.cfg.ChunkDelimiter, -1)

// generate url for new chunk blob
u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.AccountName, b.cfg.ContainerName, blobID))
u, err := url.Parse(fmt.Sprintf(b.selectBlobURLFmt(), b.cfg.StorageAccountName, b.cfg.ContainerName, blobID))
if err != nil {
return azblob.BlockBlobURL{}, err
}
Expand All @@ -269,7 +263,7 @@ func (b *BlobStorage) getBlobURL(blobID string, hedging bool) (azblob.BlockBlobU
}

func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.AccountName, b.cfg.ContainerName))
u, err := url.Parse(fmt.Sprintf(b.selectContainerURLFmt(), b.cfg.StorageAccountName, b.cfg.ContainerName))
if err != nil {
return azblob.ContainerURL{}, err
}
Expand All @@ -289,11 +283,6 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
},
}

credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.String())
if err != nil {
return nil, err
}

client := defaultClientFactory()

opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
Expand All @@ -316,7 +305,12 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
})
}

if !b.cfg.UseManagedIdentity {
if !b.cfg.UseManagedIdentity && b.cfg.UserAssignedID == "" {
credential, err := azblob.NewSharedKeyCredential(b.cfg.StorageAccountName, b.cfg.StorageAccountKey.String())
if err != nil {
return nil, err
}

return azblob.NewPipeline(credential, opts), nil
}

Expand All @@ -329,7 +323,7 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
}

func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
spt, err := b.fetchMSIToken()
spt, err := b.getServicePrincipalToken()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -357,18 +351,25 @@ func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
return &tc, nil
}

func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) {
// msiEndpoint is the well known endpoint for getting MSI authentications tokens
// msiEndpoint := "http://169.254.169.254/metadata/identity/oauth2/token" for production Jobs
msiEndpoint, _ := adal.GetMSIVMEndpoint()
func (b *BlobStorage) getServicePrincipalToken() (*adal.ServicePrincipalToken, error) {
var endpoint string
if b.cfg.Endpoint != "" {
endpoint = b.cfg.Endpoint
} else {
endpoint = defaultEndpoints[b.cfg.Environment]
}

resource := fmt.Sprintf("https://%s.%s", b.cfg.StorageAccountName, endpoint)

// both can be empty, systemAssignedMSI scenario
spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/")
if err != nil {
return nil, err
msiConfig := auth.MSIConfig{
Resource: resource,
}

if b.cfg.UserAssignedID != "" {
msiConfig.ClientID = b.cfg.UserAssignedID
}

return spt, spt.Refresh()
return msiConfig.ServicePrincipalToken()
}

// List implements chunk.ObjectClient.
Expand Down Expand Up @@ -434,11 +435,11 @@ func (c *BlobStorageConfig) Validate() error {
}

func (b *BlobStorage) selectBlobURLFmt() string {
return endpoints[b.cfg.Environment].blobURLFmt
return fmt.Sprintf("https://%%s.%s/%%s/%%s", defaultEndpoints[b.cfg.Environment])
}

func (b *BlobStorage) selectContainerURLFmt() string {
return endpoints[b.cfg.Environment].containerURLFmt
return fmt.Sprintf("https://%%s.%s/%%s", defaultEndpoints[b.cfg.Environment])
}

// IsObjectNotFoundErr returns true if error means that object is not found. Relevant to GetObject and DeleteObject operations.
Expand Down