From 01315dda3a1243263fa8282d91758805a5ec2c35 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 23 Sep 2020 11:37:36 +0900 Subject: [PATCH 01/13] feat: add test Signed-off-by: hlts2 --- .../db/storage/blob/s3/reader/reader_mock.go | 17 + .../db/storage/blob/s3/reader/reader_test.go | 325 ++++++++++++++++-- 2 files changed, 304 insertions(+), 38 deletions(-) create mode 100644 internal/db/storage/blob/s3/reader/reader_mock.go diff --git a/internal/db/storage/blob/s3/reader/reader_mock.go b/internal/db/storage/blob/s3/reader/reader_mock.go new file mode 100644 index 0000000000..36af4e3d21 --- /dev/null +++ b/internal/db/storage/blob/s3/reader/reader_mock.go @@ -0,0 +1,17 @@ +package reader + +// MockReadCloser represents mock of io.ReadCloser. +type MockReadCloser struct { + ReadFunc func(p []byte) (n int, err error) + CloseFunc func() error +} + +// Read calls ReadFunc. +func (m *MockReadCloser) Read(p []byte) (n int, err error) { + return m.ReadFunc(p) +} + +// Close calls CloseFunc. +func (m *MockReadCloser) Close() error { + return m.CloseFunc() +} diff --git a/internal/db/storage/blob/s3/reader/reader_test.go b/internal/db/storage/blob/s3/reader/reader_test.go index d65d4d1bd6..d6b53b39de 100644 --- a/internal/db/storage/blob/s3/reader/reader_test.go +++ b/internal/db/storage/blob/s3/reader/reader_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/aws/aws-sdk-go/service/s3" + "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "go.uber.org/goleak" @@ -230,18 +231,237 @@ func Test_reader_Close(t *testing.T) { } return nil } + tests := []test{ + { + name: "returns nil when close success", + fields: fields{ + pr: &MockReadCloser{ + CloseFunc: func() error { + return nil + }, + }, + }, + want: want{ + err: nil, + }, + }, + + { + name: "returns nil when close is nil", + fields: fields{ + wg: new(sync.WaitGroup), + }, + want: want{ + err: nil, + }, + }, + + { + name: "returns nil when close fails", + fields: fields{ + pr: &MockReadCloser{ + CloseFunc: func() error { + return errors.New("err") + }, + }, + }, + want: want{ + err: errors.New("err"), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + r := &reader{ + eg: test.fields.eg, + service: test.fields.service, + bucket: test.fields.bucket, + key: test.fields.key, + pr: test.fields.pr, + wg: test.fields.wg, + } + + err := r.Close() + if err := test.checkFunc(test.want, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_reader_Read(t *testing.T) { + type args struct { + p []byte + } + type fields struct { + eg errgroup.Group + service *s3.S3 + bucket string + key string + pr io.ReadCloser + wg *sync.WaitGroup + } + type want struct { + wantN int + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, int, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, gotN int, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(gotN, w.wantN) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotN, w.wantN) + } + return nil + } + tests := []test{ + { + name: "returns (10, nil) when read success", + args: args{ + p: []byte{}, + }, + fields: fields{ + pr: &MockReadCloser{ + ReadFunc: func(p []byte) (n int, err error) { + return 10, nil + }, + }, + }, + want: want{ + wantN: 10, + err: nil, + }, + }, + + { + name: "returns error when read fails", + args: args{ + p: []byte{}, + }, + fields: fields{ + pr: &MockReadCloser{ + ReadFunc: func(p []byte) (n int, err error) { + return 0, errors.New("err") + }, + }, + }, + want: want{ + wantN: 0, + err: errors.New("err"), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + r := &reader{ + eg: test.fields.eg, + service: test.fields.service, + bucket: test.fields.bucket, + key: test.fields.key, + pr: test.fields.pr, + wg: test.fields.wg, + } + + gotN, err := r.Read(test.args.p) + if err := test.checkFunc(test.want, gotN, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_reader_getObjectWithBackoff(t *testing.T) { + type args struct { + ctx context.Context + offset int64 + length int64 + } + type fields struct { + eg errgroup.Group + service *s3.S3 + bucket string + key string + pr io.ReadCloser + wg *sync.WaitGroup + backoffEnabled bool + backoffOpts []backoff.Option + maxChunkSize int64 + } + type want struct { + want io.Reader + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, io.Reader, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got io.Reader, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) + } + return nil + } tests := []test{ // TODO test cases /* { name: "test_case_1", + args: args { + ctx: nil, + offset: 0, + length: 0, + }, fields: fields { eg: nil, service: nil, bucket: "", key: "", pr: nil, - wg: sync.WaitGroup{}, + wg: nil, + backoffEnabled: false, + backoffOpts: nil, + maxChunkSize: 0, }, want: want{}, checkFunc: defaultCheckFunc, @@ -253,13 +473,21 @@ func Test_reader_Close(t *testing.T) { func() test { return test { name: "test_case_2", + args: args { + ctx: nil, + offset: 0, + length: 0, + }, fields: fields { eg: nil, service: nil, bucket: "", key: "", pr: nil, - wg: sync.WaitGroup{}, + wg: nil, + backoffEnabled: false, + backoffOpts: nil, + maxChunkSize: 0, }, want: want{}, checkFunc: defaultCheckFunc, @@ -272,25 +500,28 @@ func Test_reader_Close(t *testing.T) { t.Run(test.name, func(tt *testing.T) { defer goleak.VerifyNone(tt) if test.beforeFunc != nil { - test.beforeFunc() + test.beforeFunc(test.args) } if test.afterFunc != nil { - defer test.afterFunc() + defer test.afterFunc(test.args) } if test.checkFunc == nil { test.checkFunc = defaultCheckFunc } r := &reader{ - eg: test.fields.eg, - service: test.fields.service, - bucket: test.fields.bucket, - key: test.fields.key, - pr: test.fields.pr, - wg: test.fields.wg, + eg: test.fields.eg, + service: test.fields.service, + bucket: test.fields.bucket, + key: test.fields.key, + pr: test.fields.pr, + wg: test.fields.wg, + backoffEnabled: test.fields.backoffEnabled, + backoffOpts: test.fields.backoffOpts, + maxChunkSize: test.fields.maxChunkSize, } - err := r.Close() - if err := test.checkFunc(test.want, err); err != nil { + got, err := r.getObjectWithBackoff(test.args.ctx, test.args.offset, test.args.length) + if err := test.checkFunc(test.want, got, err); err != nil { tt.Errorf("error = %v", err) } @@ -298,37 +529,42 @@ func Test_reader_Close(t *testing.T) { } } -func Test_reader_Read(t *testing.T) { +func Test_reader_getObject(t *testing.T) { type args struct { - p []byte + ctx context.Context + offset int64 + length int64 } type fields struct { - eg errgroup.Group - service *s3.S3 - bucket string - key string - pr io.ReadCloser - wg *sync.WaitGroup + eg errgroup.Group + service *s3.S3 + bucket string + key string + pr io.ReadCloser + wg *sync.WaitGroup + backoffEnabled bool + backoffOpts []backoff.Option + maxChunkSize int64 } type want struct { - wantN int - err error + want io.Reader + err error } type test struct { name string args args fields fields want want - checkFunc func(want, int, error) error + checkFunc func(want, io.Reader, error) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, gotN int, err error) error { + defaultCheckFunc := func(w want, got io.Reader, err error) error { if !errors.Is(err, w.err) { return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) } - if !reflect.DeepEqual(gotN, w.wantN) { - return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotN, w.wantN) + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) } return nil } @@ -338,7 +574,9 @@ func Test_reader_Read(t *testing.T) { { name: "test_case_1", args: args { - p: nil, + ctx: nil, + offset: 0, + length: 0, }, fields: fields { eg: nil, @@ -346,7 +584,10 @@ func Test_reader_Read(t *testing.T) { bucket: "", key: "", pr: nil, - wg: sync.WaitGroup{}, + wg: nil, + backoffEnabled: false, + backoffOpts: nil, + maxChunkSize: 0, }, want: want{}, checkFunc: defaultCheckFunc, @@ -359,7 +600,9 @@ func Test_reader_Read(t *testing.T) { return test { name: "test_case_2", args: args { - p: nil, + ctx: nil, + offset: 0, + length: 0, }, fields: fields { eg: nil, @@ -367,7 +610,10 @@ func Test_reader_Read(t *testing.T) { bucket: "", key: "", pr: nil, - wg: sync.WaitGroup{}, + wg: nil, + backoffEnabled: false, + backoffOpts: nil, + maxChunkSize: 0, }, want: want{}, checkFunc: defaultCheckFunc, @@ -389,16 +635,19 @@ func Test_reader_Read(t *testing.T) { test.checkFunc = defaultCheckFunc } r := &reader{ - eg: test.fields.eg, - service: test.fields.service, - bucket: test.fields.bucket, - key: test.fields.key, - pr: test.fields.pr, - wg: test.fields.wg, + eg: test.fields.eg, + service: test.fields.service, + bucket: test.fields.bucket, + key: test.fields.key, + pr: test.fields.pr, + wg: test.fields.wg, + backoffEnabled: test.fields.backoffEnabled, + backoffOpts: test.fields.backoffOpts, + maxChunkSize: test.fields.maxChunkSize, } - gotN, err := r.Read(test.args.p) - if err := test.checkFunc(test.want, gotN, err); err != nil { + got, err := r.getObject(test.args.ctx, test.args.offset, test.args.length) + if err := test.checkFunc(test.want, got, err); err != nil { tt.Errorf("error = %v", err) } From 6515c5704e86f34a4335a1c4648aaa7a9071a591 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 23 Sep 2020 18:08:55 +0900 Subject: [PATCH 02/13] feat: add test and mock Signed-off-by: hlts2 --- internal/db/storage/blob/s3/reader/io/io.go | 31 + internal/db/storage/blob/s3/reader/reader.go | 11 +- .../db/storage/blob/s3/reader/reader_mock.go | 37 + .../db/storage/blob/s3/reader/reader_test.go | 674 +++++++++++++----- 4 files changed, 587 insertions(+), 166 deletions(-) create mode 100644 internal/db/storage/blob/s3/reader/io/io.go diff --git a/internal/db/storage/blob/s3/reader/io/io.go b/internal/db/storage/blob/s3/reader/io/io.go new file mode 100644 index 0000000000..ce45a05639 --- /dev/null +++ b/internal/db/storage/blob/s3/reader/io/io.go @@ -0,0 +1,31 @@ +package io + +import ( + "context" + "io" + + iio "github.com/vdaas/vald/internal/io" +) + +// IO represents an interface for context io. +type IO interface { + NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error) + NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) +} + +type ctxio struct{} + +// New returns CtxIO implementation. +func New() IO { + return new(ctxio) +} + +// NewReaderWithContext calls io.NewReaderWithContext. +func (*ctxio) NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error) { + return iio.NewReaderWithContext(ctx, r) +} + +// NewReadCNewReadCloserWithContext calls io.NewReadCloserWithContext. +func (*ctxio) NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return iio.NewReadCloserWithContext(ctx, r) +} diff --git a/internal/db/storage/blob/s3/reader/reader.go b/internal/db/storage/blob/s3/reader/reader.go index cd6c8e123b..244f85d6a8 100644 --- a/internal/db/storage/blob/s3/reader/reader.go +++ b/internal/db/storage/blob/s3/reader/reader.go @@ -27,23 +27,26 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/vdaas/vald/internal/backoff" + ctxio "github.com/vdaas/vald/internal/db/storage/blob/s3/reader/io" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" - ctxio "github.com/vdaas/vald/internal/io" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" ) type reader struct { eg errgroup.Group - service *s3.S3 + service s3iface.S3API bucket string key string pr io.ReadCloser wg *sync.WaitGroup + ctxio ctxio.IO + backoffEnabled bool backoffOpts []backoff.Option maxChunkSize int64 @@ -94,7 +97,7 @@ func (r *reader) Open(ctx context.Context) (err error) { return err } - body, err = ctxio.NewReaderWithContext(ctx, body) + body, err = r.ctxio.NewReaderWithContext(ctx, body) if err != nil { return err } @@ -164,7 +167,7 @@ func (r *reader) getObject(ctx context.Context, offset, length int64) (io.Reader return nil, err } - res, err := ctxio.NewReadCloserWithContext(ctx, resp.Body) + res, err := r.ctxio.NewReadCloserWithContext(ctx, resp.Body) if err != nil { return nil, err } diff --git a/internal/db/storage/blob/s3/reader/reader_mock.go b/internal/db/storage/blob/s3/reader/reader_mock.go index 36af4e3d21..5d876002e1 100644 --- a/internal/db/storage/blob/s3/reader/reader_mock.go +++ b/internal/db/storage/blob/s3/reader/reader_mock.go @@ -1,5 +1,42 @@ package reader +import ( + "context" + "io" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" +) + +// MockS3API represents mock of s3iface.MMockS3API. +type MockS3API struct { + s3iface.S3API + GetObjectWithContextFunc func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) +} + +// GetObjectWithContext calls GetObjectWithContextFunc. +func (m *MockS3API) GetObjectWithContext(ctx aws.Context, in *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) { + return m.GetObjectWithContextFunc(ctx, in, opts...) +} + +// MockIO represents mock of io.IO +type MockIO struct { + NewReaderWithContextFunc func(ctx context.Context, r io.Reader) (io.Reader, error) + NewReadCloserWithContextFunc func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) +} + +// NewReaderWithContext calls NewReaderWithContextFunc. +func (m *MockIO) NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error) { + return m.NewReaderWithContextFunc(ctx, r) +} + +// NewReadCloserWithContext calls NewReadCloserWithContextFunc. +func (m *MockIO) NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return m.NewReadCloserWithContextFunc(ctx, r) +} + // MockReadCloser represents mock of io.ReadCloser. type MockReadCloser struct { ReadFunc func(p []byte) (n int, err error) diff --git a/internal/db/storage/blob/s3/reader/reader_test.go b/internal/db/storage/blob/s3/reader/reader_test.go index d6b53b39de..9bd200c8e2 100644 --- a/internal/db/storage/blob/s3/reader/reader_test.go +++ b/internal/db/storage/blob/s3/reader/reader_test.go @@ -17,19 +17,33 @@ package reader import ( + "bytes" "context" "io" + "io/ioutil" + "os" "reflect" "sync" "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/vdaas/vald/internal/backoff" + ctxio "github.com/vdaas/vald/internal/db/storage/blob/s3/reader/io" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" "go.uber.org/goleak" ) +func TestMain(m *testing.M) { + log.Init() + os.Exit(m.Run()) +} + func TestNew(t *testing.T) { type args struct { opts []Option @@ -106,12 +120,15 @@ func Test_reader_Open(t *testing.T) { ctx context.Context } type fields struct { - eg errgroup.Group - service *s3.S3 - bucket string - key string - pr io.ReadCloser - wg *sync.WaitGroup + eg errgroup.Group + backoffEnabled bool + service s3iface.S3API + bucket string + key string + pr io.ReadCloser + wg *sync.WaitGroup + ctxio ctxio.IO + backoffOpts []backoff.Option } type want struct { err error @@ -123,7 +140,7 @@ func Test_reader_Open(t *testing.T) { want want checkFunc func(want, error) error beforeFunc func(args) - afterFunc func(args) + afterFunc func(args, *testing.T) } defaultCheckFunc := func(w want, err error) error { if !errors.Is(err, w.err) { @@ -132,75 +149,237 @@ func Test_reader_Open(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - ctx: nil, - }, - fields: fields { - eg: nil, - service: nil, - bucket: "", - key: "", - pr: nil, - wg: sync.WaitGroup{}, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ + func() test { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + eg, _ := errgroup.New(ctx) + + return test{ + name: "returns nil when context was canceled", + args: args{ + ctx: cctx, + }, + fields: fields{ + eg: eg, + }, + want: want{ + err: nil, + }, + beforeFunc: func(args) { + cancel() + }, + afterFunc: func(_ args, t *testing.T) { + t.Helper() + if err := eg.Wait(); !errors.Is(err, context.Canceled) { + t.Errorf("want: %v, but got: %v", context.Canceled, err) + } + }, + } + }(), + + func() test { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + eg, _ := errgroup.New(ctx) + + wantErr := errors.New("err") + return test{ + name: "returns nil when backoff enable and s3 service returns error", + args: args{ + ctx: cctx, + }, + fields: fields{ + eg: eg, + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return nil, wantErr + }, + }, + backoffEnabled: true, + backoffOpts: []backoff.Option{ + backoff.WithRetryCount(1), + }, + }, + want: want{ + err: nil, + }, + afterFunc: func(_ args, t *testing.T) { + t.Helper() - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - ctx: nil, - }, - fields: fields { - eg: nil, - service: nil, - bucket: "", - key: "", - pr: nil, - wg: sync.WaitGroup{}, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + if err := eg.Wait(); !errors.Is(err, wantErr) { + t.Errorf("want: %v, but got: %v", wantErr, err) + } + + cancel() + }, + } + }(), + + func() test { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + eg, _ := errgroup.New(ctx) + + wantErr := errors.New("err") + return test{ + name: "returns nil when backoff disable and s3 service returns error", + args: args{ + ctx: cctx, + }, + fields: fields{ + eg: eg, + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return nil, wantErr + }, + }, + }, + want: want{ + err: nil, + }, + afterFunc: func(_ args, t *testing.T) { + t.Helper() + + if err := eg.Wait(); !errors.Is(err, wantErr) { + t.Errorf("want: %v, but got: %v", wantErr, err) + } + + cancel() + }, + } + }(), + + func() test { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + eg, _ := errgroup.New(ctx) + + wantErr := errors.New("err") + return test{ + name: "returns nil when backoff disable and reader copy fails", + args: args{ + ctx: cctx, + }, + fields: fields{ + eg: eg, + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReaderWithContextFunc: func(ctx context.Context, r io.Reader) (io.Reader, error) { + return nil, wantErr + }, + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return &MockReadCloser{ + CloseFunc: func() error { + return nil + }, + ReadFunc: func(p []byte) (n int, err error) { + return 1, io.EOF + }, + }, nil + }, + }, + }, + want: want{ + err: nil, + }, + afterFunc: func(_ args, t *testing.T) { + t.Helper() + + if err := eg.Wait(); !errors.Is(err, wantErr) { + t.Errorf("want: %v, but got: %v", wantErr, err) + } + + cancel() + }, + } + }(), + + func() test { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + eg, _ := errgroup.New(ctx) + + wantErr := errors.New("err") + return test{ + name: "returns nil when backoff disable and reader copy fails", + args: args{ + ctx: cctx, + }, + fields: fields{ + eg: eg, + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReaderWithContextFunc: func(ctx context.Context, r io.Reader) (io.Reader, error) { + return &MockReadCloser{ + ReadFunc: func(p []byte) (n int, err error) { + return 0, wantErr + }, + }, nil + }, + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return &MockReadCloser{ + CloseFunc: func() error { + return nil + }, + ReadFunc: func(p []byte) (n int, err error) { + return 1, io.EOF + }, + }, nil + }, + }, + }, + want: want{ + err: nil, + }, + afterFunc: func(_ args, t *testing.T) { + t.Helper() + + if err := eg.Wait(); !errors.Is(err, wantErr) { + t.Errorf("want: %v, but got: %v", wantErr, err) + } + + cancel() + }, + } + }(), } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } if test.afterFunc != nil { - defer test.afterFunc(test.args) + defer test.afterFunc(test.args, t) } if test.checkFunc == nil { test.checkFunc = defaultCheckFunc } r := &reader{ - eg: test.fields.eg, - service: test.fields.service, - bucket: test.fields.bucket, - key: test.fields.key, - pr: test.fields.pr, - wg: test.fields.wg, + eg: test.fields.eg, + service: test.fields.service, + bucket: test.fields.bucket, + key: test.fields.key, + pr: test.fields.pr, + wg: test.fields.wg, + ctxio: test.fields.ctxio, + backoffEnabled: test.fields.backoffEnabled, + backoffOpts: test.fields.backoffOpts, } err := r.Open(test.args.ctx) if err := test.checkFunc(test.want, err); err != nil { tt.Errorf("error = %v", err) } - }) } } @@ -411,7 +590,7 @@ func Test_reader_getObjectWithBackoff(t *testing.T) { } type fields struct { eg errgroup.Group - service *s3.S3 + service s3iface.S3API bucket string key string pr io.ReadCloser @@ -419,6 +598,7 @@ func Test_reader_getObjectWithBackoff(t *testing.T) { backoffEnabled bool backoffOpts []backoff.Option maxChunkSize int64 + ctxio ctxio.IO } type want struct { want io.Reader @@ -443,62 +623,70 @@ func Test_reader_getObjectWithBackoff(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - ctx: nil, - offset: 0, - length: 0, - }, - fields: fields { - eg: nil, - service: nil, - bucket: "", - key: "", - pr: nil, - wg: nil, - backoffEnabled: false, - backoffOpts: nil, - maxChunkSize: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ + { + name: "returns error when s3 service returns error and backoff fails", + args: args{ + ctx: context.Background(), + offset: 1, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return &MockReadCloser{ + CloseFunc: func() error { + return nil + }, + ReadFunc: func(p []byte) (n int, err error) { + return 1, io.EOF + }, + }, nil + }, + }, + }, + want: want{ + want: func() *bytes.Buffer { + buf := new(bytes.Buffer) + var b byte + buf.WriteByte(b) + return buf + }(), + err: nil, + }, + }, - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - ctx: nil, - offset: 0, - length: 0, - }, - fields: fields { - eg: nil, - service: nil, - bucket: "", - key: "", - pr: nil, - wg: nil, - backoffEnabled: false, - backoffOpts: nil, - maxChunkSize: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "returns error when s3 service returns error and backoff fails", + args: args{ + ctx: context.Background(), + offset: 1, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return nil, errors.New("err") + }, + }, + backoffEnabled: false, + backoffOpts: []backoff.Option{ + backoff.WithRetryCount(1), + }, + }, + want: want{ + err: errors.New("err"), + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -518,13 +706,13 @@ func Test_reader_getObjectWithBackoff(t *testing.T) { backoffEnabled: test.fields.backoffEnabled, backoffOpts: test.fields.backoffOpts, maxChunkSize: test.fields.maxChunkSize, + ctxio: test.fields.ctxio, } got, err := r.getObjectWithBackoff(test.args.ctx, test.args.offset, test.args.length) if err := test.checkFunc(test.want, got, err); err != nil { tt.Errorf("error = %v", err) } - }) } } @@ -537,7 +725,7 @@ func Test_reader_getObject(t *testing.T) { } type fields struct { eg errgroup.Group - service *s3.S3 + service s3iface.S3API bucket string key string pr io.ReadCloser @@ -545,6 +733,7 @@ func Test_reader_getObject(t *testing.T) { backoffEnabled bool backoffOpts []backoff.Option maxChunkSize int64 + ctxio ctxio.IO } type want struct { want io.Reader @@ -569,62 +758,223 @@ func Test_reader_getObject(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - ctx: nil, - offset: 0, - length: 0, - }, - fields: fields { - eg: nil, - service: nil, - bucket: "", - key: "", - pr: nil, - wg: nil, - backoffEnabled: false, - backoffOpts: nil, - maxChunkSize: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ + { + name: "returns (io.Reader, nil) when no error occurs", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return &MockReadCloser{ + CloseFunc: func() error { + return nil + }, + ReadFunc: func(p []byte) (n int, err error) { + return 1, io.EOF + }, + }, nil + }, + }, + }, + want: want{ + want: func() *bytes.Buffer { + buf := new(bytes.Buffer) + var b byte + buf.WriteByte(b) + return buf + }(), + err: nil, + }, + }, - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - ctx: nil, - offset: 0, - length: 0, - }, - fields: fields { - eg: nil, - service: nil, - bucket: "", - key: "", - pr: nil, - wg: nil, - backoffEnabled: false, - backoffOpts: nil, - maxChunkSize: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "returns (io.Reader, nil) when reader close error occurs and output warning", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return &MockReadCloser{ + CloseFunc: func() error { + return errors.New("err") + }, + ReadFunc: func(p []byte) (n int, err error) { + return 1, io.EOF + }, + }, nil + }, + }, + }, + want: want{ + want: func() *bytes.Buffer { + buf := new(bytes.Buffer) + var b byte + buf.WriteByte(b) + return buf + }(), + err: nil, + }, + }, + + { + name: "returns nil when s3 service returns error and error code is ErrBlobNoSuchBucket", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return nil, awserr.New(s3.ErrCodeNoSuchBucket, "", nil) + }, + }, + bucket: "vald", + }, + want: want{ + want: ioutil.NopCloser(bytes.NewReader(nil)), + err: nil, + }, + }, + + { + name: "returns nil when s3 service returns error and error code is ErrCodeNoSuchKey", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return nil, awserr.New(s3.ErrCodeNoSuchKey, "", nil) + }, + }, + key: "vald", + }, + want: want{ + want: ioutil.NopCloser(bytes.NewReader(nil)), + err: nil, + }, + }, + + { + name: "returns nil when s3 service returns error and error code is ErrCodeNoSuchKey", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return nil, awserr.New("InvalidRange", "", nil) + }, + }, + }, + want: want{ + want: ioutil.NopCloser(bytes.NewReader(nil)), + err: nil, + }, + }, + + { + name: "returns s3 error when s3 service returns error and error code is `Invalid`", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return nil, awserr.New("Invalid", "", nil) + }, + }, + }, + want: want{ + want: nil, + err: awserr.New("Invalid", "", nil), + }, + }, + + { + name: "returns error when reader creation fails", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return nil, errors.New("err") + }, + }, + }, + want: want{ + want: nil, + err: errors.New("err"), + }, + }, + + { + name: "returns error when failed to copy to buffer", + args: args{ + ctx: context.Background(), + offset: 2, + length: 10, + }, + fields: fields{ + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return &MockReadCloser{ + CloseFunc: func() error { + return nil + }, + ReadFunc: func(p []byte) (n int, err error) { + return 0, errors.New("err") + }, + }, nil + }, + }, + }, + want: want{ + want: nil, + err: errors.New("err"), + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -644,13 +994,13 @@ func Test_reader_getObject(t *testing.T) { backoffEnabled: test.fields.backoffEnabled, backoffOpts: test.fields.backoffOpts, maxChunkSize: test.fields.maxChunkSize, + ctxio: test.fields.ctxio, } got, err := r.getObject(test.args.ctx, test.args.offset, test.args.length) if err := test.checkFunc(test.want, got, err); err != nil { tt.Errorf("error = %v", err) } - }) } } From 5994cd605fbef57d5ac3ae3d84fe95d69e42e83b Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 11:42:22 +0900 Subject: [PATCH 03/13] feat: add test case Signed-off-by: hlts2 --- .../db/storage/blob/s3/reader/reader_test.go | 56 ++++++++++--------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/internal/db/storage/blob/s3/reader/reader_test.go b/internal/db/storage/blob/s3/reader/reader_test.go index 9bd200c8e2..4dba29b842 100644 --- a/internal/db/storage/blob/s3/reader/reader_test.go +++ b/internal/db/storage/blob/s3/reader/reader_test.go @@ -66,36 +66,40 @@ func TestNew(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - opts: nil, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - opts: nil, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "returns writer when option is empty", + args: args{ + opts: nil, + }, + want: want{ + want: &reader{ + eg: errgroup.Get(), + maxChunkSize: 512 * 1024 * 1024, + backoffEnabled: false, + }, + }, + }, + + { + name: "returns writer when option is not empty", + args: args{ + opts: []Option{ + WithBackoff(true), + }, + }, + want: want{ + want: &reader{ + eg: errgroup.Get(), + maxChunkSize: 512 * 1024 * 1024, + backoffEnabled: true, + }, + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } From 46fdcd80832b224e14a6e8dbc658cfc86b887384 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 12:42:38 +0900 Subject: [PATCH 04/13] feat: add comment Signed-off-by: hlts2 --- internal/db/storage/blob/s3/reader/io/io.go | 6 +++--- internal/db/storage/blob/s3/reader/reader.go | 6 ++++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/db/storage/blob/s3/reader/io/io.go b/internal/db/storage/blob/s3/reader/io/io.go index ce45a05639..c2a3e61583 100644 --- a/internal/db/storage/blob/s3/reader/io/io.go +++ b/internal/db/storage/blob/s3/reader/io/io.go @@ -7,7 +7,7 @@ import ( iio "github.com/vdaas/vald/internal/io" ) -// IO represents an interface for context io. +// IO represents an interface to create object for io. type IO interface { NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error) NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) @@ -15,7 +15,7 @@ type IO interface { type ctxio struct{} -// New returns CtxIO implementation. +// New returns IO implementation. func New() IO { return new(ctxio) } @@ -25,7 +25,7 @@ func (*ctxio) NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, return iio.NewReaderWithContext(ctx, r) } -// NewReadCNewReadCloserWithContext calls io.NewReadCloserWithContext. +// NewReadCloserWithContext calls io.NewReadCloserWithContext. func (*ctxio) NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { return iio.NewReadCloserWithContext(ctx, r) } diff --git a/internal/db/storage/blob/s3/reader/reader.go b/internal/db/storage/blob/s3/reader/reader.go index 244f85d6a8..e29ac91593 100644 --- a/internal/db/storage/blob/s3/reader/reader.go +++ b/internal/db/storage/blob/s3/reader/reader.go @@ -52,11 +52,13 @@ type reader struct { maxChunkSize int64 } +// Reader is an interface that groups the basic Read and Close and Open methods. type Reader interface { Open(ctx context.Context) error io.ReadCloser } +// New returns Reader implementation. func New(opts ...Option) Reader { r := new(reader) for _, opt := range append(defaultOpts, opts...) { @@ -66,6 +68,8 @@ func New(opts ...Option) Reader { return r } +// Open creates io.Pipe. After reading the data from s3, make it available with Read method. +// Open method returns an error to align the interface, but it doesn't actually return an error. func (r *reader) Open(ctx context.Context) (err error) { var pw io.WriteCloser @@ -189,6 +193,7 @@ func (r *reader) getObject(ctx context.Context, offset, length int64) (io.Reader return buf, nil } +// Close closes the reader. func (r *reader) Close() error { if r.pr != nil { return r.pr.Close() @@ -201,6 +206,7 @@ func (r *reader) Close() error { return nil } +// Read reads up to len(p) bytes into p. It returns the number of bytes. func (r *reader) Read(p []byte) (n int, err error) { if r.pr == nil { return 0, errors.ErrStorageReaderNotOpened From dc8fd5e0a4790afcf000aed043d100a3dcd75543 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 15:42:43 +0900 Subject: [PATCH 05/13] fix: test case Signed-off-by: hlts2 --- .../db/storage/blob/s3/reader/reader_mock.go | 6 +- .../db/storage/blob/s3/reader/reader_test.go | 121 ++++++++++++++++-- 2 files changed, 111 insertions(+), 16 deletions(-) diff --git a/internal/db/storage/blob/s3/reader/reader_mock.go b/internal/db/storage/blob/s3/reader/reader_mock.go index 5d876002e1..1d3430160b 100644 --- a/internal/db/storage/blob/s3/reader/reader_mock.go +++ b/internal/db/storage/blob/s3/reader/reader_mock.go @@ -10,7 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" ) -// MockS3API represents mock of s3iface.MMockS3API. +// MockS3API represents mock for s3iface.MMockS3API. type MockS3API struct { s3iface.S3API GetObjectWithContextFunc func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) @@ -21,7 +21,7 @@ func (m *MockS3API) GetObjectWithContext(ctx aws.Context, in *s3.GetObjectInput, return m.GetObjectWithContextFunc(ctx, in, opts...) } -// MockIO represents mock of io.IO +// MockIO represents mock for io.IO type MockIO struct { NewReaderWithContextFunc func(ctx context.Context, r io.Reader) (io.Reader, error) NewReadCloserWithContextFunc func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) @@ -37,7 +37,7 @@ func (m *MockIO) NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) return m.NewReadCloserWithContextFunc(ctx, r) } -// MockReadCloser represents mock of io.ReadCloser. +// MockReadCloser represents mock for io.ReadCloser. type MockReadCloser struct { ReadFunc func(p []byte) (n int, err error) CloseFunc func() error diff --git a/internal/db/storage/blob/s3/reader/reader_test.go b/internal/db/storage/blob/s3/reader/reader_test.go index 4dba29b842..46b43eb2d7 100644 --- a/internal/db/storage/blob/s3/reader/reader_test.go +++ b/internal/db/storage/blob/s3/reader/reader_test.go @@ -67,7 +67,7 @@ func TestNew(t *testing.T) { } tests := []test{ { - name: "returns writer when option is empty", + name: "returns reader when option is empty", args: args{ opts: nil, }, @@ -81,7 +81,7 @@ func TestNew(t *testing.T) { }, { - name: "returns writer when option is not empty", + name: "returns reader when option is not empty", args: args{ opts: []Option{ WithBackoff(true), @@ -133,6 +133,7 @@ func Test_reader_Open(t *testing.T) { wg *sync.WaitGroup ctxio ctxio.IO backoffOpts []backoff.Option + maxChunkSize int64 } type want struct { err error @@ -145,6 +146,7 @@ func Test_reader_Open(t *testing.T) { checkFunc func(want, error) error beforeFunc func(args) afterFunc func(args, *testing.T) + hookFunc func(*reader) } defaultCheckFunc := func(w want, err error) error { if !errors.Is(err, w.err) { @@ -159,7 +161,7 @@ func Test_reader_Open(t *testing.T) { eg, _ := errgroup.New(ctx) return test{ - name: "returns nil when context was canceled", + name: "returns nil when context is canceled", args: args{ ctx: cctx, }, @@ -188,7 +190,7 @@ func Test_reader_Open(t *testing.T) { wantErr := errors.New("err") return test{ - name: "returns nil when backoff enable and s3 service returns error", + name: "returns nil when backoff is enabled and s3 service returns an error", args: args{ ctx: cctx, }, @@ -226,7 +228,7 @@ func Test_reader_Open(t *testing.T) { wantErr := errors.New("err") return test{ - name: "returns nil when backoff disable and s3 service returns error", + name: "returns nil when backoff is disabled and s3 service returns an error", args: args{ ctx: cctx, }, @@ -260,7 +262,7 @@ func Test_reader_Open(t *testing.T) { wantErr := errors.New("err") return test{ - name: "returns nil when backoff disable and reader copy fails", + name: "returns nil when backoff is disabled and the reader creation fails", args: args{ ctx: cctx, }, @@ -309,7 +311,7 @@ func Test_reader_Open(t *testing.T) { wantErr := errors.New("err") return test{ - name: "returns nil when backoff disable and reader copy fails", + name: "returns nil when backoff is disabled and the reader copy fails", args: args{ ctx: cctx, }, @@ -334,7 +336,7 @@ func Test_reader_Open(t *testing.T) { return nil }, ReadFunc: func(p []byte) (n int, err error) { - return 1, io.EOF + return 0, io.EOF }, }, nil }, @@ -354,6 +356,84 @@ func Test_reader_Open(t *testing.T) { }, } }(), + + func() test { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + eg, _ := errgroup.New(ctx) + + roopCnt := 0 + return test{ + name: "returns nil when backoff is disable and multiple reads success", + args: args{ + ctx: cctx, + }, + fields: fields{ + eg: eg, + maxChunkSize: 10, + service: &MockS3API{ + GetObjectWithContextFunc: func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) { + return new(s3.GetObjectOutput), nil + }, + }, + ctxio: &MockIO{ + NewReaderWithContextFunc: func(ctx context.Context, r io.Reader) (io.Reader, error) { + return &MockReadCloser{ + ReadFunc: func(p []byte) (n int, err error) { + if roopCnt == 0 { + roopCnt++ + return 10, io.EOF + } + return 0, io.EOF + }, + }, nil + }, + NewReadCloserWithContextFunc: func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) { + return &MockReadCloser{ + CloseFunc: func() error { + return nil + }, + ReadFunc: func(p []byte) (n int, err error) { + return 10, io.EOF + }, + }, nil + }, + }, + }, + want: want{ + err: nil, + }, + hookFunc: func(r *reader) { + go func() { + bytes := [][]byte{ + make([]byte, 10), + make([]byte, 0), + } + for { + select { + case <-cctx.Done(): + return + default: + if roopCnt == 0 { + r.Read(bytes[0]) + } else { + r.Read(bytes[1]) + } + } + } + }() + }, + afterFunc: func(_ args, t *testing.T) { + t.Helper() + + if err := eg.Wait(); err != nil { + t.Errorf("want: %v, but got: %v", nil, err) + } + + cancel() + }, + } + }(), } for _, test := range tests { @@ -378,9 +458,13 @@ func Test_reader_Open(t *testing.T) { ctxio: test.fields.ctxio, backoffEnabled: test.fields.backoffEnabled, backoffOpts: test.fields.backoffOpts, + maxChunkSize: test.fields.maxChunkSize, } err := r.Open(test.args.ctx) + if test.hookFunc != nil { + test.hookFunc(r) + } if err := test.checkFunc(test.want, err); err != nil { tt.Errorf("error = %v", err) } @@ -430,7 +514,7 @@ func Test_reader_Close(t *testing.T) { }, { - name: "returns nil when close is nil", + name: "returns nil when the close is nil", fields: fields{ wg: new(sync.WaitGroup), }, @@ -440,7 +524,7 @@ func Test_reader_Close(t *testing.T) { }, { - name: "returns nil when close fails", + name: "returns nil when the close fails", fields: fields{ pr: &MockReadCloser{ CloseFunc: func() error { @@ -537,6 +621,17 @@ func Test_reader_Read(t *testing.T) { }, }, + { + name: "returns error when read is nil", + args: args{ + p: []byte{}, + }, + want: want{ + wantN: 0, + err: errors.ErrStorageReaderNotOpened, + }, + }, + { name: "returns error when read fails", args: args{ @@ -628,7 +723,7 @@ func Test_reader_getObjectWithBackoff(t *testing.T) { } tests := []test{ { - name: "returns error when s3 service returns error and backoff fails", + name: "returns (Reader, nil) when no error occurs", args: args{ ctx: context.Background(), offset: 1, @@ -763,7 +858,7 @@ func Test_reader_getObject(t *testing.T) { } tests := []test{ { - name: "returns (io.Reader, nil) when no error occurs", + name: "returns (Reader, nil) when no error occurs", args: args{ ctx: context.Background(), offset: 2, @@ -800,7 +895,7 @@ func Test_reader_getObject(t *testing.T) { }, { - name: "returns (io.Reader, nil) when reader close error occurs and output warning", + name: "returns (Reader, nil) when the reader close error occurs and output warning", args: args{ ctx: context.Background(), offset: 2, From 8c20f1627553f6ffeb45a4545c0548b973a24116 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 15:57:39 +0900 Subject: [PATCH 06/13] fix: import path Signed-off-by: hlts2 --- internal/db/storage/blob/s3/reader/reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/db/storage/blob/s3/reader/reader.go b/internal/db/storage/blob/s3/reader/reader.go index e29ac91593..3629c97ac9 100644 --- a/internal/db/storage/blob/s3/reader/reader.go +++ b/internal/db/storage/blob/s3/reader/reader.go @@ -26,10 +26,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/vdaas/vald/internal/backoff" ctxio "github.com/vdaas/vald/internal/db/storage/blob/s3/reader/io" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" From 72546f6d3774f7e4fc8fb7a882cc443b2a3ce263 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 16:28:55 +0900 Subject: [PATCH 07/13] feat: add alias for s3 Signed-off-by: hlts2 --- .../db/storage/blob/s3/reader/reader_mock.go | 4 ++-- .../db/storage/blob/s3/reader/reader_test.go | 4 ++-- internal/db/storage/blob/s3/sdk/s3/s3.go | 17 +++++++++++++++-- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/internal/db/storage/blob/s3/reader/reader_mock.go b/internal/db/storage/blob/s3/reader/reader_mock.go index 1d3430160b..446ec5ca16 100644 --- a/internal/db/storage/blob/s3/reader/reader_mock.go +++ b/internal/db/storage/blob/s3/reader/reader_mock.go @@ -6,8 +6,8 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface" ) // MockS3API represents mock for s3iface.MMockS3API. diff --git a/internal/db/storage/blob/s3/reader/reader_test.go b/internal/db/storage/blob/s3/reader/reader_test.go index 46b43eb2d7..9fddae5551 100644 --- a/internal/db/storage/blob/s3/reader/reader_test.go +++ b/internal/db/storage/blob/s3/reader/reader_test.go @@ -29,10 +29,10 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/vdaas/vald/internal/backoff" ctxio "github.com/vdaas/vald/internal/db/storage/blob/s3/reader/io" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" diff --git a/internal/db/storage/blob/s3/sdk/s3/s3.go b/internal/db/storage/blob/s3/sdk/s3/s3.go index 3c6f6312e2..c8ec3f4706 100644 --- a/internal/db/storage/blob/s3/sdk/s3/s3.go +++ b/internal/db/storage/blob/s3/sdk/s3/s3.go @@ -19,5 +19,18 @@ import ( "github.com/aws/aws-sdk-go/service/s3" ) -// S3 is type alias for s3.S3. -type S3 = s3.S3 +type ( + // S3 is type alias for s3.S3. + S3 = s3.S3 + // GetObjectInput is type alias for s3.GetObjectInput. + GetObjectInput = s3.GetObjectInput + // GetObjectOutput is type alias for s3.GetObjectOutput. + GetObjectOutput = s3.GetObjectOutput +) + +const ( + // ErrCodeNoSuchBucket is an alias for s3.ErrCodeNoSuchBucket. + ErrCodeNoSuchBucket = s3.ErrCodeNoSuchBucket + // ErrCodeNoSuchKey is an alias for s3.ErrCodeNoSuchKey. + ErrCodeNoSuchKey = s3.ErrCodeNoSuchKey +) From f9c9836579c93bb0c3456a8c9c848d2bafa8243a Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 16:30:26 +0900 Subject: [PATCH 08/13] fix: lint warning Signed-off-by: hlts2 --- internal/db/storage/blob/s3/reader/reader_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/db/storage/blob/s3/reader/reader_test.go b/internal/db/storage/blob/s3/reader/reader_test.go index 9fddae5551..f1ee8743c5 100644 --- a/internal/db/storage/blob/s3/reader/reader_test.go +++ b/internal/db/storage/blob/s3/reader/reader_test.go @@ -415,9 +415,9 @@ func Test_reader_Open(t *testing.T) { return default: if roopCnt == 0 { - r.Read(bytes[0]) + _, _ = r.Read(bytes[0]) } else { - r.Read(bytes[1]) + _, _ = r.Read(bytes[1]) } } } From 8f4062d95ea8a54a0fc2c6f2e87be8ae19624e4a Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 17:51:02 +0900 Subject: [PATCH 09/13] fix: apply suggestion Signed-off-by: hlts2 --- internal/db/storage/blob/s3/reader/option.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/db/storage/blob/s3/reader/option.go b/internal/db/storage/blob/s3/reader/option.go index d65355c847..dfdaa3e2df 100644 --- a/internal/db/storage/blob/s3/reader/option.go +++ b/internal/db/storage/blob/s3/reader/option.go @@ -17,8 +17,8 @@ package reader import ( - "github.com/aws/aws-sdk-go/service/s3" "github.com/vdaas/vald/internal/backoff" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface" "github.com/vdaas/vald/internal/errgroup" ) @@ -43,7 +43,7 @@ func WithErrGroup(eg errgroup.Group) Option { } // WithService returns the option to set the service. -func WithService(s *s3.S3) Option { +func WithService(s s3iface.S3API) Option { return func(r *reader) { if s != nil { r.service = s From 7e5b1f731e3b3dc3092e726df5f23f316e860ec7 Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Thu, 24 Sep 2020 17:28:23 +0900 Subject: [PATCH 10/13] Update internal/db/storage/blob/s3/reader/reader.go Co-authored-by: Kevin Diu --- internal/db/storage/blob/s3/reader/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/db/storage/blob/s3/reader/reader.go b/internal/db/storage/blob/s3/reader/reader.go index 3629c97ac9..d07b4b0c00 100644 --- a/internal/db/storage/blob/s3/reader/reader.go +++ b/internal/db/storage/blob/s3/reader/reader.go @@ -206,7 +206,7 @@ func (r *reader) Close() error { return nil } -// Read reads up to len(p) bytes into p. It returns the number of bytes. +// Read reads up to len(p) bytes and returns the number of bytes read. func (r *reader) Read(p []byte) (n int, err error) { if r.pr == nil { return 0, errors.ErrStorageReaderNotOpened From 62823a7b23fa7424b042045667b9f05c172d69e8 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 24 Sep 2020 18:12:38 +0900 Subject: [PATCH 11/13] fix: ci error Signed-off-by: hlts2 --- internal/db/storage/blob/s3/reader/option_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/db/storage/blob/s3/reader/option_test.go b/internal/db/storage/blob/s3/reader/option_test.go index f13e2f738c..d924ff6b35 100644 --- a/internal/db/storage/blob/s3/reader/option_test.go +++ b/internal/db/storage/blob/s3/reader/option_test.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/google/go-cmp/cmp" "github.com/vdaas/vald/internal/backoff" + "github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "go.uber.org/goleak" @@ -104,7 +105,7 @@ func TestWithErrGroup(t *testing.T) { func TestWithService(t *testing.T) { type T = reader type args struct { - s *s3.S3 + s s3iface.S3API } type want struct { obj *T From 7ccaf20e1dbf5be9aa6e144df0bd25870747d703 Mon Sep 17 00:00:00 2001 From: vdaas-ci Date: Thu, 1 Oct 2020 05:44:19 +0000 Subject: [PATCH 12/13] :robot: Update license headers / Format go codes and yaml files Signed-off-by: vdaas-ci --- internal/db/storage/blob/s3/reader/io/io.go | 15 +++++++++++++++ internal/db/storage/blob/s3/reader/reader_mock.go | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/internal/db/storage/blob/s3/reader/io/io.go b/internal/db/storage/blob/s3/reader/io/io.go index c2a3e61583..eea615a5b3 100644 --- a/internal/db/storage/blob/s3/reader/io/io.go +++ b/internal/db/storage/blob/s3/reader/io/io.go @@ -1,3 +1,18 @@ +// +// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt ) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// package io import ( diff --git a/internal/db/storage/blob/s3/reader/reader_mock.go b/internal/db/storage/blob/s3/reader/reader_mock.go index 446ec5ca16..417f1ed128 100644 --- a/internal/db/storage/blob/s3/reader/reader_mock.go +++ b/internal/db/storage/blob/s3/reader/reader_mock.go @@ -1,3 +1,18 @@ +// +// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt ) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// package reader import ( From f4ec3e95b4ddc19960d36128b4b2de4608367c8b Mon Sep 17 00:00:00 2001 From: hlts2 Date: Thu, 1 Oct 2020 15:59:31 +0900 Subject: [PATCH 13/13] add pacakge comment Signed-off-by: hlts2 --- internal/db/storage/blob/s3/reader/reader_mock.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/db/storage/blob/s3/reader/reader_mock.go b/internal/db/storage/blob/s3/reader/reader_mock.go index 417f1ed128..3e30407e20 100644 --- a/internal/db/storage/blob/s3/reader/reader_mock.go +++ b/internal/db/storage/blob/s3/reader/reader_mock.go @@ -13,6 +13,9 @@ // See the License for the specific language governing permissions and // limitations under the License. // + +// Package reader provides the reader functions for handling with s3. +// This package is wrapping package of "https://github.com/aws/aws-sdk-go". package reader import ( @@ -36,7 +39,7 @@ func (m *MockS3API) GetObjectWithContext(ctx aws.Context, in *s3.GetObjectInput, return m.GetObjectWithContextFunc(ctx, in, opts...) } -// MockIO represents mock for io.IO +// MockIO represents mock for io.IO. type MockIO struct { NewReaderWithContextFunc func(ctx context.Context, r io.Reader) (io.Reader, error) NewReadCloserWithContextFunc func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error)