Skip to content

Commit

Permalink
*: support concurrent write for S3 writer (#45723)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
wjhuang2016 authored Aug 4, 2023
1 parent 64450c2 commit 5309c2f
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 22 deletions.
2 changes: 1 addition & 1 deletion br/pkg/mock/storage/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (s *AzureBlobStorage) URI() string {
const azblobChunkSize = 64 * 1024 * 1024

// Create implements the StorageWriter interface.
func (s *AzureBlobStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
uploader := &azblobUploader{
blobClient: client,
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ func WithCompression(inner ExternalStorage, compressionType CompressType) Extern
return &withCompression{ExternalStorage: inner, compressType: compressionType}
}

func (w *withCompression) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (w *withCompression) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
var (
writer ExternalFileWriter
err error
)
if s3Storage, ok := w.ExternalStorage.(*S3Storage); ok {
writer, err = s3Storage.CreateUploader(ctx, name)
} else {
writer, err = w.ExternalStorage.Create(ctx, name)
writer, err = w.ExternalStorage.Create(ctx, name, nil)
}
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (s *GCSStorage) URI() string {
}

// Create implements ExternalStorage interface.
func (s *GCSStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (s *GCSStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
object := s.objectName(name)
wc := s.bucket.Object(object).NewWriter(ctx)
wc.StorageClass = s.gcs.StorageClass
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *HDFSStorage) URI() string {
}

// Create opens a file writer by path. path is relative path to storage base path
func (*HDFSStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (l *LocalStorage) Open(_ context.Context, path string) (ExternalFileReader,
}

// Create implements ExternalStorage interface.
func (l *LocalStorage) Create(_ context.Context, name string) (ExternalFileWriter, error) {
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
file, err := os.Create(filepath.Join(l.base, name))
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestDeleteFile(t *testing.T) {
require.NoError(t, err)
require.Equal(t, false, ret)

_, err = store.Create(context.Background(), name)
_, err = store.Create(context.Background(), name, nil)
require.NoError(t, err)

ret, err = store.FileExists(context.Background(), name)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (*MemStorage) URI() string {
// Create creates a file and returning a writer to write data into.
// When the writer is closed, the data is stored in the file.
// It implements the `ExternalStorage` interface
func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMemStoreBasic(t *testing.T) {
require.NotNil(t, err)

// create a writer to write
w, err := store.Create(ctx, "/hello.txt")
w, err := store.Create(ctx, "/hello.txt", nil)
require.Nil(t, err)
_, err = w.Write(ctx, []byte("hello world 3"))
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (*noopStorage) URI() string {
}

// Create implements ExternalStorage interface.
func (*noopStorage) Create(_ context.Context, _ string) (ExternalFileWriter, error) {
func (*noopStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error) {
return &noopWriter{}, nil
}

Expand Down
57 changes: 53 additions & 4 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

alicred "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials"
Expand Down Expand Up @@ -912,11 +913,59 @@ func (rs *S3Storage) CreateUploader(ctx context.Context, name string) (ExternalF
}, nil
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
uploader, err := rs.CreateUploader(ctx, name)
type s3ObjectWriter struct {
wd *io.PipeWriter
wg *sync.WaitGroup
err error
}

// Write implement the io.Writer interface.
func (s *s3ObjectWriter) Write(_ context.Context, p []byte) (int, error) {
return s.wd.Write(p)
}

// Close implement the io.Closer interface.
func (s *s3ObjectWriter) Close(_ context.Context) error {
err := s.wd.Close()
if err != nil {
return nil, err
return err
}
s.wg.Wait()
return s.err
}

// Create creates multi upload request.
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error) {
var uploader ExternalFileWriter
var err error
if option == nil || option.Concurrency <= 1 {
uploader, err = rs.CreateUploader(ctx, name)
if err != nil {
return nil, err
}
} else {
up := s3manager.NewUploaderWithClient(rs.svc, func(u *s3manager.Uploader) {
u.Concurrency = option.Concurrency
u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * 8 * 1024 * 1024)
})
rd, wd := io.Pipe()
upParams := &s3manager.UploadInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + name),
Body: rd,
}
s3Writer := &s3ObjectWriter{wd: wd, wg: &sync.WaitGroup{}}
s3Writer.wg.Add(1)
go func() {
_, err := up.UploadWithContext(ctx, upParams)
err1 := rd.Close()
if err != nil {
log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err), zap.Error(err1))
}
s3Writer.err = err
s3Writer.wg.Done()
}()
uploader = s3Writer
}
uploaderWriter := newBufferedWriter(uploader, hardcodedS3ChunkSize, NoCompression)
return uploaderWriter, nil
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type Writer interface {
Close(ctx context.Context) error
}

type WriterOption struct {
Concurrency int
}

// ExternalStorage represents a kind of file system storage.
type ExternalStorage interface {
// WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic
Expand All @@ -103,8 +107,8 @@ type ExternalStorage interface {
// URI returns the base path as a URI
URI() string

// Create opens a file writer by path. path is relative path to storage base path
Create(ctx context.Context, path string) (ExternalFileWriter, error)
// Create opens a file writer by path. path is relative path to storage base path. Currently only s3 implemented WriterOption
Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
// Rename file name from oldFileName to newFileName
Rename(ctx context.Context, oldFileName, newFileName string) error
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/storage/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestExternalFileWriter(t *testing.T) {
storage, err := Create(ctx, backend, true)
require.NoError(t, err)
fileName := strings.ReplaceAll(test.name, " ", "-") + ".txt"
writer, err := storage.Create(ctx, fileName)
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestCompressReaderWriter(t *testing.T) {
storage = WithCompression(storage, test.compressType)
suffix := createSuffixString(test.compressType)
fileName := strings.ReplaceAll(test.name, " ", "-") + suffix
writer, err := storage.Create(ctx, fileName)
writer, err := storage.Create(ctx, fileName, nil)
require.NoError(t, err)
for _, str := range test.content {
p := []byte(str)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func fakeCheckpointFiles(
filename := fmt.Sprintf("%v.ts", info.storeID)
buff := make([]byte, 8)
binary.LittleEndian.PutUint64(buff, info.global_checkpoint)
if _, err := s.Create(ctx, filename); err != nil {
if _, err := s.Create(ctx, filename, nil); err != nil {
return errors.Trace(err)
}
if err := s.WriteFile(ctx, filename, buff); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dumpling/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func writeBytes(tctx *tcontext.Context, writer storage.ExternalFileWriter, p []b
func buildFileWriter(tctx *tcontext.Context, s storage.ExternalStorage, fileName string, compressType storage.CompressType) (storage.ExternalFileWriter, func(ctx context.Context) error, error) {
fileName += compressFileSuffix(compressType)
fullPath := s.URI() + "/" + fileName
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName)
writer, err := storage.WithCompression(s, compressType).Create(tctx, fileName, nil)
if err != nil {
tctx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down Expand Up @@ -487,7 +487,7 @@ func buildInterceptFileWriter(pCtx *tcontext.Context, s storage.ExternalStorage,
initRoutine := func() error {
// use separated context pCtx here to make sure context used in ExternalFile won't be canceled before close,
// which will cause a context canceled error when closing gcs's Writer
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName)
w, err := storage.WithCompression(s, compressType).Create(pCtx, fileName, nil)
if err != nil {
pCtx.L().Warn("fail to open file",
zap.String("path", fullPath),
Expand Down

0 comments on commit 5309c2f

Please sign in to comment.