Skip to content

Commit

Permalink
azblob: Return io.ErrUnexpectedEOF as error in UploadStream (#22109)
Browse files Browse the repository at this point in the history
  • Loading branch information
souravgupta-msft committed Dec 8, 2023
1 parent 423e02b commit b44962e
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 2 deletions.
1 change: 1 addition & 0 deletions sdk/storage/azblob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Fixed an issue that would cause metadata keys with empty values to be omitted when enumerating blobs.
* Fixed an issue where passing empty map to set blob tags API was causing panic. Fixes [#21869](https://github.com/Azure/azure-sdk-for-go/issues/21869).
* Fixed an issue where downloaded file has incorrect size when not a multiple of block size. Fixes [#21995](https://github.com/Azure/azure-sdk-for-go/issues/21995).
* Fixed case where `io.ErrUnexpectedEOF` was treated as expected error in `UploadStream`. Fixes [#21837](https://github.com/Azure/azure-sdk-for-go/issues/21837).

### Other Changes

Expand Down
4 changes: 2 additions & 2 deletions sdk/storage/azblob/blockblob/chunkwriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit
}

var n int
n, err = io.ReadFull(src, buffer)
n, err = shared.ReadAtLeast(src, buffer, len(buffer))

if n > 0 {
// some data was read, upload it
Expand Down Expand Up @@ -108,7 +108,7 @@ func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst blockWrit
}

if err != nil { // The reader is done, no more outgoing buffers
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
if errors.Is(err, io.EOF) {
// these are expected errors, we don't surface those
err = nil
} else {
Expand Down
128 changes: 128 additions & 0 deletions sdk/storage/azblob/blockblob/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5033,6 +5033,134 @@ func (s *BlockBlobUnrecordedTestsSuite) TestUploadStreamToBlobProperties() {
_require.EqualValues(actualBlobData, blobData)
}

func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadDownloadStream() {
_require := require.New(s.T())
testName := s.T().Name()
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
_require.NoError(err)

blobSize := 11 * 1024 * 1024
bufferSize := 2 * 1024 * 1024
maxBuffers := 2

containerName := testcommon.GenerateContainerName(testName)
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)

// Set up test blob
blobName := testcommon.GenerateBlobName(testName)
bbClient := testcommon.GetBlockBlobClient(blobName, containerClient)
blobContentReader, blobData := testcommon.GenerateData(blobSize)

_, err = bbClient.UploadStream(context.Background(), blobContentReader, &blockblob.UploadStreamOptions{
BlockSize: int64(bufferSize),
Concurrency: maxBuffers,
Metadata: testcommon.BasicMetadata,
Tags: testcommon.BasicBlobTagsMap,
HTTPHeaders: &testcommon.BasicHeaders,
})
_require.NoError(err)

downloadResponse, err := bbClient.DownloadStream(context.Background(), nil)
_require.NoError(err)

bbClient2 := testcommon.GetBlockBlobClient("blobName2", containerClient)

// UploadStream using http.Response.Body as the reader
_, err = bbClient2.UploadStream(context.Background(), downloadResponse.Body, &blockblob.UploadStreamOptions{
BlockSize: int64(bufferSize),
Concurrency: maxBuffers,
})
_require.NoError(err)

downloadResp2, err := bbClient2.DownloadStream(context.Background(), nil)
_require.NoError(err)

// Assert that the content is correct
actualBlobData, err := io.ReadAll(downloadResp2.Body)
_require.NoError(err)
_require.Equal(len(actualBlobData), len(blobData))
_require.EqualValues(actualBlobData, blobData)
}

// This test simulates UploadStream and DownloadBuffer methods,
// and verifies length and content of file
func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamDownloadBuffer() {
_require := require.New(s.T())
testName := s.T().Name()
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
_require.NoError(err)

containerName := testcommon.GenerateContainerName(testName)
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)

const MiB = 1024 * 1024
testUploadDownload := func(contentSize int) {
content := make([]byte, contentSize)
_, _ = rand.Read(content)
contentMD5 := md5.Sum(content)
body := streaming.NopCloser(bytes.NewReader(content))

srcBlob := containerClient.NewBlockBlobClient("srcblob")

// Prepare source bbClient for copy.
_, err = srcBlob.UploadStream(context.Background(), body, &blockblob.UploadStreamOptions{
BlockSize: 4 * MiB,
Concurrency: 5,
})
_require.NoError(err)

// Download to a buffer and verify contents
buff := make([]byte, contentSize)
b := blob.DownloadBufferOptions{
BlockSize: 5 * MiB,
Concurrency: 4,
}
n, err := srcBlob.DownloadBuffer(context.Background(), buff, &b)
_require.NoError(err)
_require.Equal(int64(contentSize), n)
_require.Equal(contentMD5, md5.Sum(buff[:]))
}

testUploadDownload(0) // zero byte blob
testUploadDownload(5 * MiB)
testUploadDownload(20 * MiB)
testUploadDownload(199 * MiB)
}

type fakeReader struct {
cnt int
}

func (a *fakeReader) Read(bytes []byte) (count int, err error) {
if a.cnt < 5 {
_, buf := testcommon.GenerateData(1024)
n := copy(bytes, buf)
a.cnt++
return n, nil
}
return 0, io.ErrUnexpectedEOF
}

func (s *BlockBlobUnrecordedTestsSuite) TestBlobUploadStreamUsingCustomReader() {
_require := require.New(s.T())
testName := s.T().Name()
svcClient, err := testcommon.GetServiceClient(s.T(), testcommon.TestAccountDefault, nil)
_require.NoError(err)

containerName := testcommon.GenerateContainerName(testName)
containerClient := testcommon.CreateNewContainer(context.Background(), _require, containerName, svcClient)
defer testcommon.DeleteContainer(context.Background(), _require, containerClient)

bbClient := testcommon.GetBlockBlobClient(testcommon.GenerateBlobName(testName), containerClient)

r := &fakeReader{}
_, err = bbClient.UploadStream(context.Background(), r, nil)
_require.Error(err)
_require.Equal(err, io.ErrUnexpectedEOF)
}

func (s *BlockBlobRecordedTestsSuite) TestBlockBlobSetTierOnVersions() {
_require := require.New(s.T())
testName := s.T().Name()
Expand Down
24 changes: 24 additions & 0 deletions sdk/storage/azblob/internal/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,27 @@ func IsIPEndpointStyle(host string) bool {
}
return net.ParseIP(host) != nil
}

// ReadAtLeast reads from r into buf until it has read at least min bytes.
// It returns the number of bytes copied and an error.
// The EOF error is returned if no bytes were read or
// EOF happened after reading fewer than min bytes.
// If min is greater than the length of buf, ReadAtLeast returns ErrShortBuffer.
// On return, n >= min if and only if err == nil.
// If r returns an error having read at least min bytes, the error is dropped.
// This method is same as io.ReadAtLeast except that it does not
// return io.ErrUnexpectedEOF when fewer than min bytes are read.
func ReadAtLeast(r io.Reader, buf []byte, min int) (n int, err error) {
if len(buf) < min {
return 0, io.ErrShortBuffer
}
for n < min && err == nil {
var nn int
nn, err = r.Read(buf[n:])
n += nn
}
if n >= min {
err = nil
}
return
}

0 comments on commit b44962e

Please sign in to comment.