Skip to content

Commit

Permalink
align abs to conventions
Browse files Browse the repository at this point in the history
  • Loading branch information
maorfr committed Nov 14, 2018
1 parent 9cad4fc commit f1e0a78
Showing 1 changed file with 11 additions and 35 deletions.
46 changes: 11 additions & 35 deletions pkg/skbn/abs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package skbn
import (
"bytes"
"context"
"errors"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -35,13 +34,14 @@ func initAbsVariables(split []string) (string, string, string) {
func getNewPipeline() (pipeline.Pipeline, error) {
accountName, accountKey := os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")

if len(accountName) == 0 || len(accountKey) == 0 {
err := errors.New("Either the AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
return nil, err
if len(accountName) == 0 {
return nil, fmt.Errorf("AZURE_STORAGE_ACCOUNT environment variable is not set")
}
if len(accountKey) == 0 {
return nil, fmt.Errorf("AZURE_STORAGE_ACCESS_KEY environment variable is not set")
}

credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)

if err != nil {
return nil, err
}
Expand All @@ -62,9 +62,7 @@ func getNewPipeline() (pipeline.Pipeline, error) {
}

func getServiceURL(pl pipeline.Pipeline, accountName string) (azblob.ServiceURL, error) {
URL, err := url.Parse(
fmt.Sprintf("https://%s.blob.core.windows.net/", accountName))

URL, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/", accountName))
if err != nil {
return azblob.ServiceURL{}, err
}
Expand All @@ -74,9 +72,7 @@ func getServiceURL(pl pipeline.Pipeline, accountName string) (azblob.ServiceURL,
}

func getContainerURL(pl pipeline.Pipeline, accountName string, containerName string) (azblob.ContainerURL, error) {
URL, err := url.Parse(
fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))

URL, err := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", accountName, containerName))
if err != nil {
return azblob.ContainerURL{}, err
}
Expand All @@ -91,7 +87,6 @@ func getBlobURL(curl azblob.ContainerURL, blob string) azblob.BlockBlobURL {

func listContainers(ctx context.Context, surl azblob.ServiceURL) ([]azblob.ContainerItem, error) {
lc, err := surl.ListContainersSegment(ctx, azblob.Marker{}, azblob.ListContainersSegmentOptions{})

if err != nil {
return nil, err
}
Expand All @@ -114,26 +109,19 @@ func GetClientToAbs(ctx context.Context, path string) (pipeline.Pipeline, error)
pSplit := strings.Split(path, "/")
a, c, _ := initAbsVariables(pSplit)
pl, err := getNewPipeline()

if err != nil {
return nil, err
}

su, err := getServiceURL(pl, a)

if err != nil {
return nil, err
}

lc, err := listContainers(ctx, su)

if err != nil {
return nil, err
}

if !containerExists(lc, c) {
err := errors.New("Azure Blob Storage container doesn't exist")
return nil, err
return nil, fmt.Errorf("Azure Blob Storage container doesn't exist")
}

return pl, nil
Expand All @@ -142,34 +130,29 @@ func GetClientToAbs(ctx context.Context, path string) (pipeline.Pipeline, error)
// GetListOfFilesFromAbs gets list of files in path from azure blob storage (recursive)
func GetListOfFilesFromAbs(ctx context.Context, iClient interface{}, path string) ([]string, error) {
pSplit := strings.Split(path, "/")

if err := validateAbsPath(pSplit); err != nil {
return nil, err
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return nil, err
}

bl := []string{}

for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := cu.ListBlobsFlatSegment(ctx, marker, azblob.ListBlobsSegmentOptions{})

if err != nil {
return nil, err
}

marker = listBlob.NextMarker

for _, blobInfo := range listBlob.Segment.BlobItems {
if strings.Contains(blobInfo.Name, p) {
bl = append(bl, strings.Replace(blobInfo.Name, p, "", 1))
if !strings.Contains(blobInfo.Name, p) {
continue
}
bl = append(bl, strings.Replace(blobInfo.Name, p, "", 1))
}
}

Expand All @@ -183,26 +166,22 @@ func DownloadFromAbs(ctx context.Context, iClient interface{}, path string) ([]b
if err := validateAbsPath(pSplit); err != nil {
return nil, err
}

a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return nil, err
}

bu := getBlobURL(cu, p)
dr, err := bu.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)

if err != nil {
return nil, err
}

bs := dr.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
dd := bytes.Buffer{}
_, err = dd.ReadFrom(bs)

if err != nil {
return nil, err
}
Expand All @@ -213,7 +192,6 @@ func DownloadFromAbs(ctx context.Context, iClient interface{}, path string) ([]b
// UploadToAbs uploads a single file to azure blob storage
func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath string, buffer []byte) error {
pSplit := strings.Split(toPath, "/")

if err := validateAbsPath(pSplit); err != nil {
return err
}
Expand All @@ -226,7 +204,6 @@ func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath stri
a, c, p := initAbsVariables(pSplit)
pl := iClient.(pipeline.Pipeline)
cu, err := getContainerURL(pl, a, c)

if err != nil {
return err
}
Expand All @@ -236,7 +213,6 @@ func UploadToAbs(ctx context.Context, iClient interface{}, toPath, fromPath stri
_, err = azblob.UploadBufferToBlockBlob(ctx, buffer, bu, azblob.UploadToBlockBlobOptions{
BlockSize: 4 * 1024 * 1024,
Parallelism: 16})

if err != nil {
return err
}
Expand Down

0 comments on commit f1e0a78

Please sign in to comment.