Skip to content

Commit

Permalink
fixed multipart upload
Browse files Browse the repository at this point in the history
  • Loading branch information
eusebiu-constantin-petu-dbk committed Sep 15, 2021
1 parent 9df6659 commit 7f10c1e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 38 deletions.
104 changes: 70 additions & 34 deletions pkg/storage/objects_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type ObjectStorage struct {
lock *sync.RWMutex
blobUploads map[string]BlobUpload
log zerolog.Logger
// We must keep track of multi part uploads to s3, because the lib
// which we are using doesn't cancel multiparts uploads
// see: https://github.com/distribution/distribution/blob/main/registry/storage/driver/s3-aws/s3.go#L545
isMultiPartUpload map[string]bool
}

func (is *ObjectStorage) RootDir() string {
Expand Down Expand Up @@ -59,11 +63,12 @@ func NewObjectStorage(rootDir string, gc bool, dedupe bool, log zlog.Logger,
}

is := &ObjectStorage{
rootDir: rootDir,
store: store,
lock: &sync.RWMutex{},
blobUploads: make(map[string]BlobUpload),
log: log.With().Caller().Logger(),
rootDir: rootDir,
store: store,
lock: &sync.RWMutex{},
blobUploads: make(map[string]BlobUpload),
log: log.With().Caller().Logger(),
isMultiPartUpload: make(map[string]bool),
}

return is
Expand Down Expand Up @@ -471,6 +476,7 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy

if err = is.store.PutContent(context.Background(), manifestPath, body); err != nil {
is.log.Error().Err(err).Str("file", manifestPath).Msg("unable to write")
return "", err
}

// now update "index.json"
Expand All @@ -486,6 +492,7 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy

if err = is.store.PutContent(context.Background(), indexPath, buf); err != nil {
is.log.Error().Err(err).Str("file", manifestPath).Msg("unable to write")
return "", err
}

return desc.Digest.String(), nil
Expand Down Expand Up @@ -617,6 +624,8 @@ func (is *ObjectStorage) NewBlobUpload(repo string) (string, error) {

blobUploadPath := is.BlobUploadPath(repo, u)

// here we should create an empty multi part upload, but that's not possible
// so we just create a regular empty file which will be overwritten by FinishBlobUpload
err = is.store.PutContent(context.Background(), blobUploadPath, []byte{})
if err != nil {
return "", errors.ErrRepoNotFound
Expand All @@ -627,14 +636,31 @@ func (is *ObjectStorage) NewBlobUpload(repo string) (string, error) {

// GetBlobUpload returns the current size of a blob upload.
func (is *ObjectStorage) GetBlobUpload(repo string, uuid string) (int64, error) {
var fileSize int64

blobUploadPath := is.BlobUploadPath(repo, uuid)
fi, err := is.store.Stat(context.Background(), blobUploadPath)

if err != nil {
return -1, err
// if it's not a multipart upload check for the regular empty file
// created by NewBlobUpload, it should have 0 size every time
isMultiPartStarted, ok := is.isMultiPartUpload[blobUploadPath]
if !isMultiPartStarted || !ok {
fi, err := is.store.Stat(context.Background(), blobUploadPath)
if err != nil {
return -1, err
}

fileSize = fi.Size()
// otherwise get the size of multi parts upload
} else {
fi, err := getMultipartFileWriter(is, blobUploadPath)
if err != nil {
return -1, err
}

fileSize = fi.Size()
}

return fi.Size(), nil
return fileSize, nil
}

// PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns
Expand All @@ -645,13 +671,13 @@ func (is *ObjectStorage) PutBlobChunkStreamed(repo string, uuid string, body io.
}

blobUploadPath := is.BlobUploadPath(repo, uuid)
_, err := is.store.Stat(context.Background(), blobUploadPath)

_, err := is.store.Stat(context.Background(), blobUploadPath)
if err != nil {
return -1, errors.ErrUploadNotFound
}

file, err := getMultipartFileWriter(is.store, blobUploadPath)
file, err := getMultipartFileWriter(is, blobUploadPath)
if err != nil {
is.log.Fatal().Err(err).Msg("failed to create multipart upload")
return -1, err
Expand Down Expand Up @@ -684,25 +710,27 @@ func (is *ObjectStorage) PutBlobChunk(repo string, uuid string, from int64, to i

blobUploadPath := is.BlobUploadPath(repo, uuid)

fi, err := is.store.Stat(context.Background(), blobUploadPath)
_, err := is.store.Stat(context.Background(), blobUploadPath)
if err != nil {
return -1, errors.ErrUploadNotFound
}

if from != fi.Size() {
is.log.Error().Int64("expected", from).Int64("actual", fi.Size()).
Msg("invalid range start for blob upload")
return -1, errors.ErrBadUploadRange
}

file, err := getMultipartFileWriter(is.store, blobUploadPath)
file, err := getMultipartFileWriter(is, blobUploadPath)
if err != nil {
is.log.Fatal().Err(err).Msg("failed to create multipart upload")
return -1, err
}

defer file.Close()

if from != file.Size() {
file.Cancel()
file.Close()
is.log.Error().Int64("expected", from).Int64("actual", file.Size()).
Msg("invalid range start for blob upload")
return -1, errors.ErrBadUploadRange
}

buf := new(bytes.Buffer)

_, err = buf.ReadFrom(body)
Expand All @@ -715,6 +743,8 @@ func (is *ObjectStorage) PutBlobChunk(repo string, uuid string, from int64, to i
is.log.Fatal().Err(err).Msg("failed to append to file")
}

is.isMultiPartUpload[blobUploadPath] = true

return int64(n), err
}

Expand Down Expand Up @@ -755,10 +785,7 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read
return err
}

if err := fileWriter.Close(); err != nil {
is.log.Error().Err(err).Msg("failed to close file")
return err
}
fileWriter.Close()

fileReader, err := is.store.Reader(context.Background(), src, 0)
if err != nil {
Expand All @@ -778,10 +805,7 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read
return errors.ErrBadBlobDigest
}

err = fileReader.Close()
if err != nil {
return err
}
fileReader.Close()

dst := is.BlobPath(repo, dstDigest)

Expand All @@ -791,6 +815,9 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read
return err
}

// remove multipart upload, not needed anymore
delete(is.isMultiPartUpload, src)

return nil
}

Expand Down Expand Up @@ -1012,13 +1039,22 @@ func writeFile(store storageDriver.StorageDriver, filepath string, buf []byte) (
return n, nil
}

// Because we can not create an empty multipart upload, we first try to get a multipart upload session,
// otherwise we create it.
func getMultipartFileWriter(store storageDriver.StorageDriver, filepath string) (storageDriver.FileWriter, error) {
file, err := store.Writer(context.Background(), filepath, false)
if err != nil {
// multipart upload already created
file, err = store.Writer(context.Background(), filepath, true)
// Because we can not create an empty multipart upload, we store multi part uploads
// so that we know when to create a fileWriter with append=true or with append=false
// Trying and handling errors results in weird s3 api errors.
func getMultipartFileWriter(is *ObjectStorage, filepath string) (storageDriver.FileWriter, error) {
var file storageDriver.FileWriter

var err error

isMultiPartStarted, ok := is.isMultiPartUpload[filepath]
if !isMultiPartStarted || !ok {
file, err = is.store.Writer(context.Background(), filepath, false)
if err != nil {
return file, err
}
} else {
file, err = is.store.Writer(context.Background(), filepath, true)
if err != nil {
return file, err
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,29 @@ func TestStorageAPIs(t *testing.T) {
So(b, ShouldBeGreaterThanOrEqualTo, 0)

content := []byte("test-data1")
firstChunkContent := []byte("test")
firstChunkBuf := bytes.NewBuffer(firstChunkContent)
secondChunkContent := []byte("-data1")
secondChunkBuf := bytes.NewBuffer(secondChunkContent)
firstChunkLen := firstChunkBuf.Len()
secondChunkLen := secondChunkBuf.Len()

buf := bytes.NewBuffer(content)
l := buf.Len()
d := godigest.FromBytes(content)
blobDigest := d

// invalid chunk range - fails with localstack...
// invalid chunk range
_, err = il.PutBlobChunk("test", v, 10, int64(l), buf)
So(err, ShouldNotBeNil)

b, err = il.PutBlobChunk("test", v, 0, int64(l), buf)
b, err = il.PutBlobChunk("test", v, 0, int64(firstChunkLen), firstChunkBuf)
So(err, ShouldBeNil)
So(b, ShouldEqual, l)
blobDigest := d
So(b, ShouldEqual, firstChunkLen)

b, err = il.PutBlobChunk("test", v, int64(firstChunkLen), int64(l), secondChunkBuf)
So(err, ShouldBeNil)
So(b, ShouldEqual, secondChunkLen)

err = il.FinishBlobUpload("test", v, buf, d.String())
So(err, ShouldBeNil)
Expand Down

0 comments on commit 7f10c1e

Please sign in to comment.