diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 343af576179c..921d8186bf21 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -706,6 +706,10 @@ The `azure_storage_config` configures Azure as a general storage for different d # Maximum time to wait before retrying a request. # CLI flag: -.azure.max-retry-delay [max_retry_delay: | default = 500ms] + +# Use Managed Identity or not. +# CLI flag: -ruler.storage.azure.use-managed-identity +[use_managed_identity: | default = false] ``` ## gcs_storage_config diff --git a/docs/sources/storage/_index.md b/docs/sources/storage/_index.md index 5bd90947ddcd..924c2cc2103e 100644 --- a/docs/sources/storage/_index.md +++ b/docs/sources/storage/_index.md @@ -307,6 +307,7 @@ storage_config: # See https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#containers container_name: request_timeout: 0 + use_managed_identity: boltdb_shipper: active_index_directory: /data/loki/boltdb-shipper-active cache_location: /data/loki/boltdb-shipper-cache diff --git a/go.mod b/go.mod index 2b2ffb212eae..e2d3221fe991 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( cloud.google.com/go/storage v1.10.0 github.com/Azure/azure-pipeline-go v0.2.3 github.com/Azure/azure-storage-blob-go v0.13.0 + github.com/Azure/go-autorest/autorest/adal v0.9.17 github.com/Masterminds/sprig/v3 v3.2.2 github.com/NYTimes/gziphandler v1.1.1 github.com/Shopify/sarama v1.30.0 @@ -114,7 +115,6 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.11.22 // indirect - github.com/Azure/go-autorest/autorest/adal v0.9.17 // indirect github.com/Azure/go-autorest/autorest/azure/auth v0.5.8 // indirect github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect diff --git a/pkg/storage/chunk/azure/blob_storage_client.go b/pkg/storage/chunk/azure/blob_storage_client.go index 43286a105e80..778c879ef642 100644 --- a/pkg/storage/chunk/azure/blob_storage_client.go +++ b/pkg/storage/chunk/azure/blob_storage_client.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" + "github.com/Azure/go-autorest/autorest/adal" "github.com/mattn/go-ieproxy" "github.com/prometheus/client_golang/prometheus" @@ -93,6 +94,7 @@ type BlobStorageConfig struct { MaxRetries int `yaml:"max_retries"` MinRetryDelay time.Duration `yaml:"min_retry_delay"` MaxRetryDelay time.Duration `yaml:"max_retry_delay"` + UseManagedIdentity bool `yaml:"use_managed_identity"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -113,6 +115,7 @@ func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.") f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.") f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.") + f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity or not.") } func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig { @@ -241,11 +244,7 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) { } func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) { - credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value) - if err != nil { - return nil, err - } - + // defining the Azure Pipeline Options opts := azblob.PipelineOptions{ Retry: azblob.RetryOptions{ Policy: azblob.RetryPolicyExponential, @@ -255,6 +254,12 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe MaxRetryDelay: b.cfg.MaxRetryDelay, }, } + + credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value) + if err != nil { + return nil, err + } + client := defaultClientFactory() opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { @@ -277,7 +282,61 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe }) } - return azblob.NewPipeline(credential, opts), nil + if !b.cfg.UseManagedIdentity { + return azblob.NewPipeline(credential, opts), nil + } + + tokenCredential, err := b.getOAuthToken() + if err != nil { + return nil, err + } + + return azblob.NewPipeline(*tokenCredential, opts), nil + +} + +func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) { + spt, err := b.fetchMSIToken() + if err != nil { + return nil, err + } + + // Refresh obtains a fresh token + err = spt.Refresh() + if err != nil { + return nil, err + } + + tc := azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration { + err := spt.Refresh() + if err != nil { + // something went wrong, prevent the refresher from being triggered again + return 0 + } + + // set the new token value + tc.SetToken(spt.Token().AccessToken) + + // get the next token slightly before the current one expires + return time.Until(spt.Token().Expires()) - 10*time.Second + }) + + return &tc, nil +} + +func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) { + // msiEndpoint is the well known endpoint for getting MSI authentications tokens + // msiEndpoint := "http://169.254.169.254/metadata/identity/oauth2/token" for production Jobs + msiEndpoint, _ := adal.GetMSIVMEndpoint() + + // both can be empty, systemAssignedMSI scenario + spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/") + + if err != nil { + return nil, err + } + + return spt, spt.Refresh() } // List implements chunk.ObjectClient.