Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for internal/db/storage/blob/s3/reader. #718

Merged
merged 13 commits into from
Oct 2, 2020
46 changes: 46 additions & 0 deletions internal/db/storage/blob/s3/reader/io/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package io

import (
"context"
"io"

iio "github.com/vdaas/vald/internal/io"
)

// IO represents an interface to create object for io.
type IO interface {
NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error)
NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error)
}

type ctxio struct{}

// New returns IO implementation.
func New() IO {
return new(ctxio)
}

// NewReaderWithContext calls io.NewReaderWithContext.
func (*ctxio) NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error) {
return iio.NewReaderWithContext(ctx, r)
}

// NewReadCloserWithContext calls io.NewReadCloserWithContext.
func (*ctxio) NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) {
return iio.NewReadCloserWithContext(ctx, r)
}
4 changes: 2 additions & 2 deletions internal/db/storage/blob/s3/reader/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package reader

import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
"github.com/vdaas/vald/internal/errgroup"
)

Expand All @@ -43,7 +43,7 @@ func WithErrGroup(eg errgroup.Group) Option {
}

// WithService returns the option to set the service.
func WithService(s *s3.S3) Option {
func WithService(s s3iface.S3API) Option {
return func(r *reader) {
if s != nil {
r.service = s
Expand Down
3 changes: 2 additions & 1 deletion internal/db/storage/blob/s3/reader/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/go-cmp/cmp"
"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"go.uber.org/goleak"
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestWithErrGroup(t *testing.T) {
func TestWithService(t *testing.T) {
type T = reader
type args struct {
s *s3.S3
s s3iface.S3API
}
type want struct {
obj *T
Expand Down
19 changes: 14 additions & 5 deletions internal/db/storage/blob/s3/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,39 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/vdaas/vald/internal/backoff"
ctxio "github.com/vdaas/vald/internal/db/storage/blob/s3/reader/io"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
ctxio "github.com/vdaas/vald/internal/io"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
)

type reader struct {
eg errgroup.Group
service *s3.S3
service s3iface.S3API
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
bucket string
key string

pr io.ReadCloser
wg *sync.WaitGroup

ctxio ctxio.IO

backoffEnabled bool
backoffOpts []backoff.Option
maxChunkSize int64
}

// Reader is an interface that groups the basic Read and Close and Open methods.
type Reader interface {
Open(ctx context.Context) error
io.ReadCloser
}

// New returns Reader implementation.
func New(opts ...Option) Reader {
r := new(reader)
for _, opt := range append(defaultOpts, opts...) {
Expand All @@ -63,6 +68,8 @@ func New(opts ...Option) Reader {
return r
}

// Open creates io.Pipe. After reading the data from s3, make it available with Read method.
// Open method returns an error to align the interface, but it doesn't actually return an error.
func (r *reader) Open(ctx context.Context) (err error) {
var pw io.WriteCloser

Expand Down Expand Up @@ -94,7 +101,7 @@ func (r *reader) Open(ctx context.Context) (err error) {
return err
}

body, err = ctxio.NewReaderWithContext(ctx, body)
body, err = r.ctxio.NewReaderWithContext(ctx, body)
if err != nil {
return err
}
Expand Down Expand Up @@ -164,7 +171,7 @@ func (r *reader) getObject(ctx context.Context, offset, length int64) (io.Reader
return nil, err
}

res, err := ctxio.NewReadCloserWithContext(ctx, resp.Body)
res, err := r.ctxio.NewReadCloserWithContext(ctx, resp.Body)
if err != nil {
return nil, err
}
Expand All @@ -186,6 +193,7 @@ func (r *reader) getObject(ctx context.Context, offset, length int64) (io.Reader
return buf, nil
}

// Close closes the reader.
func (r *reader) Close() error {
if r.pr != nil {
return r.pr.Close()
Expand All @@ -198,6 +206,7 @@ func (r *reader) Close() error {
return nil
}

// Read reads up to len(p) bytes and returns the number of bytes read.
func (r *reader) Read(p []byte) (n int, err error) {
if r.pr == nil {
return 0, errors.ErrStorageReaderNotOpened
Expand Down
72 changes: 72 additions & 0 deletions internal/db/storage/blob/s3/reader/reader_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package reader provides the reader functions for handling with s3.
// This package is wrapping package of "https://github.com/aws/aws-sdk-go".
package reader
kevindiu marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"io"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
)

// MockS3API represents mock for s3iface.MMockS3API.
type MockS3API struct {
s3iface.S3API
GetObjectWithContextFunc func(aws.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error)
}

// GetObjectWithContext calls GetObjectWithContextFunc.
func (m *MockS3API) GetObjectWithContext(ctx aws.Context, in *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) {
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
return m.GetObjectWithContextFunc(ctx, in, opts...)
}

// MockIO represents mock for io.IO.
type MockIO struct {
NewReaderWithContextFunc func(ctx context.Context, r io.Reader) (io.Reader, error)
NewReadCloserWithContextFunc func(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error)
}

// NewReaderWithContext calls NewReaderWithContextFunc.
func (m *MockIO) NewReaderWithContext(ctx context.Context, r io.Reader) (io.Reader, error) {
return m.NewReaderWithContextFunc(ctx, r)
}

// NewReadCloserWithContext calls NewReadCloserWithContextFunc.
func (m *MockIO) NewReadCloserWithContext(ctx context.Context, r io.ReadCloser) (io.ReadCloser, error) {
return m.NewReadCloserWithContextFunc(ctx, r)
}

// MockReadCloser represents mock for io.ReadCloser.
type MockReadCloser struct {
ReadFunc func(p []byte) (n int, err error)
CloseFunc func() error
}

// Read calls ReadFunc.
func (m *MockReadCloser) Read(p []byte) (n int, err error) {
return m.ReadFunc(p)
}

// Close calls CloseFunc.
func (m *MockReadCloser) Close() error {
return m.CloseFunc()
}
Loading