Skip to content

Commit

Permalink
Merge pull request #309 from zilliztech/azure-fix
Browse files Browse the repository at this point in the history
fix azure abort err
  • Loading branch information
lentitude2tk authored Mar 5, 2024
2 parents 7a2066c + decf390 commit 86ed608
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions core/storage/azure_object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
Expand Down Expand Up @@ -200,19 +201,39 @@ func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, obj
}

func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error {
var blobCli *blockblob.Client
var fromPathUrl string
if aos.clients[fromBucketName].accessKeyID == aos.clients[toBucketName].accessKeyID {
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath)
_, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
return err
fromPathUrl = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath)
blobCli = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath)
} else {
srcSAS, err := aos.getSAS(fromBucketName)
if err != nil {
return err
}
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode())
_, err = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
return err
fromPathUrl = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode())
blobCli = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath)
}

// we need to abort the previous copy operation before copy from url
abortErr := func() error {
blobProperties, err := blobCli.BlobClient().GetProperties(ctx, nil)
if err != nil {
return fmt.Errorf("storage: azure get properties %w", err)
}
if blobProperties.CopyID != nil {
if _, err = blobCli.AbortCopyFromURL(ctx, *blobProperties.CopyID, nil); err != nil {
return fmt.Errorf("storage: azure abort copy from url %w", err)
}
}
return nil
}()

if _, err := blobCli.CopyFromURL(ctx, fromPathUrl, nil); err != nil {
return fmt.Errorf("storage: azure copy from url %w abort previous %w", err, abortErr)
}

return nil
}

func (aos *AzureObjectStorage) getSAS(bucket string) (*sas.QueryParameters, error) {
Expand Down

0 comments on commit 86ed608

Please sign in to comment.