Skip to content

Commit

Permalink
Added error checks, disabled storage container creation
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanovOleg committed Nov 12, 2018
1 parent 89db105 commit e1e3f8a
Showing 1 changed file with 62 additions and 10 deletions.
72 changes: 62 additions & 10 deletions pkg/skbn/abs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package skbn
import (
"bytes"
"context"
"errors"
"fmt"
"log"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
Expand All @@ -35,15 +36,27 @@ func getNewPipeline() (pipeline.Pipeline, error) {
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")

if len(accountName) == 0 || len(accountKey) == 0 {
log.Fatal("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
err := errors.New("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
return nil, err
}

credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)

if err != nil {
log.Fatal("Invalid credentials with error: " + err.Error())
return nil, err
}

po := azblob.PipelineOptions{
Retry: azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: 3,
TryTimeout: time.Second * 3,
RetryDelay: time.Second * 1,
MaxRetryDelay: time.Second * 3,
},
}
pl := azblob.NewPipeline(credential, azblob.PipelineOptions{})

pl := azblob.NewPipeline(credential, po)

return pl, err
}
Expand Down Expand Up @@ -93,16 +106,26 @@ func GetClientToAbs(ctx context.Context, path string) (pipeline.Pipeline, error)
pSplit := strings.Split(path, "/")
a, c, _ := initAbsVariables(pSplit)
pl, err := getNewPipeline()

if err != nil {
return nil, err
}

su, err := getServiceURL(pl, a)

if err != nil {
return nil, err
}

lc, err := listContainers(ctx, su)
cu, err := getContainerURL(pl, a, c)

if !containerExists(lc, c) {
_, err := createContainer(ctx, pl, cu)
if err != nil {
return nil, err
}

if err != nil {
return nil, err
}
if !containerExists(lc, c) {
err := errors.New("Azure Blob Storage container doesn't exist")
return nil, err
}

return pl, err
Expand All @@ -119,6 +142,11 @@ func GetListOfFilesFromAbs(ctx context.Context, iClient interface{}, path string
a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return nil, err
}

bl := []string{}

for marker := (azblob.Marker{}); marker.NotDone(); {
Expand Down Expand Up @@ -149,8 +177,23 @@ func DownloadFromAbs(ctx context.Context, iClient interface{}, path string) ([]b
a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return nil, err
}

bu, err := getBlobURL(cu, p)

if err != nil {
return nil, err
}

dr, err := bu.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)

if err != nil {
return nil, err
}

bs := dr.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
dd := bytes.Buffer{}
_, err = dd.ReadFrom(bs)
Expand All @@ -174,8 +217,17 @@ func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath stri
a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return err
}

bu, err := getBlobURL(cu, p)

if err != nil {
return err
}

_, err = azblob.UploadBufferToBlockBlob(ctx, buffer, bu, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})
Expand Down

0 comments on commit e1e3f8a

Please sign in to comment.