Skip to content

Commit

Permalink
feat(): add ManagedIdentity in Azure Blob Storage (#4858)
Browse files Browse the repository at this point in the history
* feat(): add ManagedIdentity in Azure

* fix(): fix merge error

* fix(): fix a problem during conflict resolving

* fix(): fix the problems based on reviews

* feat(): add deps

* Update pkg/storage/chunk/azure/blob_storage_client.go

* chore(): adjust mi logics

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
sandy2008 and cyriltovena authored Dec 15, 2021
1 parent 4687b41 commit fc2bc69
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 7 deletions.
4 changes: 4 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,10 @@ The `azure_storage_config` configures Azure as a general storage for different d
# Maximum time to wait before retrying a request.
# CLI flag: -<prefix>.azure.max-retry-delay
[max_retry_delay: <duration> | default = 500ms]
# Use Managed Identity or not.
# CLI flag: -ruler.storage.azure.use-managed-identity
[use_managed_identity: <boolean> | default = false]
```

## gcs_storage_config
Expand Down
1 change: 1 addition & 0 deletions docs/sources/storage/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ storage_config:
# See https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction#containers
container_name: <container-name>
request_timeout: 0
use_managed_identity: <true|false>
boltdb_shipper:
active_index_directory: /data/loki/boltdb-shipper-active
cache_location: /data/loki/boltdb-shipper-cache
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
cloud.google.com/go/storage v1.10.0
github.com/Azure/azure-pipeline-go v0.2.3
github.com/Azure/azure-storage-blob-go v0.13.0
github.com/Azure/go-autorest/autorest/adal v0.9.17
github.com/Masterminds/sprig/v3 v3.2.2
github.com/NYTimes/gziphandler v1.1.1
github.com/Shopify/sarama v1.30.0
Expand Down Expand Up @@ -114,7 +115,6 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.22 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.17 // indirect
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8 // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
Expand Down
71 changes: 65 additions & 6 deletions pkg/storage/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/mattn/go-ieproxy"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -93,6 +94,7 @@ type BlobStorageConfig struct {
MaxRetries int `yaml:"max_retries"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
UseManagedIdentity bool `yaml:"use_managed_identity"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -113,6 +115,7 @@ func (c *BlobStorageConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS
f.IntVar(&c.MaxRetries, prefix+"azure.max-retries", 5, "Number of retries for a request which times out.")
f.DurationVar(&c.MinRetryDelay, prefix+"azure.min-retry-delay", 10*time.Millisecond, "Minimum time to wait before retrying a request.")
f.DurationVar(&c.MaxRetryDelay, prefix+"azure.max-retry-delay", 500*time.Millisecond, "Maximum time to wait before retrying a request.")
f.BoolVar(&c.UseManagedIdentity, prefix+"azure.use-managed-identity", false, "Use Managed Identity or not.")
}

func (c *BlobStorageConfig) ToCortexAzureConfig() cortex_azure.BlobStorageConfig {
Expand Down Expand Up @@ -241,11 +244,7 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
}

func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) {
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
}

// defining the Azure Pipeline Options
opts := azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
Expand All @@ -255,6 +254,12 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}

credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
}

client := defaultClientFactory()

opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
Expand All @@ -277,7 +282,61 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
})
}

return azblob.NewPipeline(credential, opts), nil
if !b.cfg.UseManagedIdentity {
return azblob.NewPipeline(credential, opts), nil
}

tokenCredential, err := b.getOAuthToken()
if err != nil {
return nil, err
}

return azblob.NewPipeline(*tokenCredential, opts), nil

}

func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
spt, err := b.fetchMSIToken()
if err != nil {
return nil, err
}

// Refresh obtains a fresh token
err = spt.Refresh()
if err != nil {
return nil, err
}

tc := azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration {
err := spt.Refresh()
if err != nil {
// something went wrong, prevent the refresher from being triggered again
return 0
}

// set the new token value
tc.SetToken(spt.Token().AccessToken)

// get the next token slightly before the current one expires
return time.Until(spt.Token().Expires()) - 10*time.Second
})

return &tc, nil
}

func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) {
// msiEndpoint is the well known endpoint for getting MSI authentications tokens
// msiEndpoint := "http://169.254.169.254/metadata/identity/oauth2/token" for production Jobs
msiEndpoint, _ := adal.GetMSIVMEndpoint()

// both can be empty, systemAssignedMSI scenario
spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/")

if err != nil {
return nil, err
}

return spt, spt.Refresh()
}

// List implements chunk.ObjectClient.
Expand Down

0 comments on commit fc2bc69

Please sign in to comment.