Skip to content

Commit

Permalink
fix(): fix the problems based on reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
sandy2008 committed Dec 9, 2021
1 parent 0c35fba commit a758ec6
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions pkg/storage/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,49 +244,53 @@ func (b *BlobStorage) buildContainerURL() (azblob.ContainerURL, error) {
}

func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipeline.Pipeline, error) {
if b.cfg.UseManagedIdentity == false {
// defing the Azure Pipeline Options
opts := azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: (int32)(b.cfg.MaxRetries),
TryTimeout: b.cfg.RequestTimeout,
RetryDelay: b.cfg.MinRetryDelay,
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}

if !b.cfg.UseManagedIdentity {
credential, err := azblob.NewSharedKeyCredential(b.cfg.AccountName, b.cfg.AccountKey.Value)
if err != nil {
return nil, err
}

opts := azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: (int32)(b.cfg.MaxRetries),
TryTimeout: b.cfg.RequestTimeout,
RetryDelay: b.cfg.MinRetryDelay,
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}
client := defaultClientFactory()

opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})

if hedging {
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
client, err := hedgingCfg.ClientWithRegisterer(client, prometheus.WrapRegistererWithPrefix("loki", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}
opts.HTTPSender = pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
resp, err := client.Do(request.WithContext(ctx))
return pipeline.NewHTTPResponse(resp), err
}
})
}

return azblob.NewPipeline(credential, opts), nil
} else {
tokenCredential, err := b.getOAuthToken()
if err != nil {
return nil, err
}

return azblob.NewPipeline(*tokenCredential, azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: (int32)(b.cfg.MaxRetries),
TryTimeout: b.cfg.RequestTimeout,
RetryDelay: b.cfg.MinRetryDelay,
MaxRetryDelay: b.cfg.MaxRetryDelay,
},
}), nil
}
tokenCredential, err := b.getOAuthToken()
if err != nil {
return nil, err
}

return azblob.NewPipeline(*tokenCredential, opts), nil

}

func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
Expand All @@ -300,7 +304,6 @@ func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
if err != nil {
return nil, err
}
client := defaultClientFactory()

tc := azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration {
err := spt.Refresh()
Expand All @@ -324,11 +327,8 @@ func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) {
// msiEndpoint := "http://169.254.169.254/metadata/identity/oauth2/token" for production Jobs
msiEndpoint, _ := adal.GetMSIVMEndpoint()

var spt *adal.ServicePrincipalToken
var err error

// both can be empty, systemAssignedMSI scenario
spt, err = adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/")
spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/")

if err != nil {
return nil, err
Expand Down

0 comments on commit a758ec6

Please sign in to comment.