From b5e700a8f00ec8ca138c1722d442272de63210ea Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Mon, 6 Jul 2020 16:28:28 +0900 Subject: [PATCH 01/14] refactor load test --- cmd/tools/cli/loadtest/sample.yaml | 11 +- pkg/tools/cli/loadtest/assets/loader.go | 8 +- pkg/tools/cli/loadtest/config/config.go | 68 ++++++-- pkg/tools/cli/loadtest/service/insert.go | 145 ++++++++++++++-- pkg/tools/cli/loadtest/service/insert_test.go | 6 +- pkg/tools/cli/loadtest/service/loader.go | 161 ++++++++++++------ .../cli/loadtest/service/loader_option.go | 25 ++- pkg/tools/cli/loadtest/service/loader_test.go | 12 +- pkg/tools/cli/loadtest/service/search.go | 63 +++++-- pkg/tools/cli/loadtest/service/search_test.go | 6 +- pkg/tools/cli/loadtest/usecase/load.go | 6 +- 11 files changed, 398 insertions(+), 113 deletions(-) diff --git a/cmd/tools/cli/loadtest/sample.yaml b/cmd/tools/cli/loadtest/sample.yaml index 4675381147..509a61b327 100644 --- a/cmd/tools/cli/loadtest/sample.yaml +++ b/cmd/tools/cli/loadtest/sample.yaml @@ -19,12 +19,15 @@ version: v0.0.0 time_zone: JST logging: logger: glg - level: debug - format: json -method: insert + level: info + format: raw +service: agent +operation: insert dataset: fashion-mnist concurrency: 100 -addr: "localhost:8081" +batch_size: 10 +progress_duration: 3s +addr: "localhost:8082" client: addrs: [] health_check_duration: 1s diff --git a/pkg/tools/cli/loadtest/assets/loader.go b/pkg/tools/cli/loadtest/assets/loader.go index 9e3b154fa2..74afcada8e 100644 --- a/pkg/tools/cli/loadtest/assets/loader.go +++ b/pkg/tools/cli/loadtest/assets/loader.go @@ -92,14 +92,14 @@ func Load(path string) (train, test, distances [][]float32, neighbors [][]int, d }() trainDim, v1, err := loadDataset(f, "train", loadFloat32) if err != nil { - log.Error("couldn't load train dataset for path %s: %s", path, err) + log.Errorf("couldn't load train dataset for path %s: %s", path, err) return nil, nil, nil, nil, 0, err } train = v1.([][]float32) dim = trainDim testDim, v2, err := loadDataset(f, "test", loadFloat32) if err != nil { - log.Error("couldn't load test dataset for path %s: %s", path, err) + log.Errorf("couldn't load test dataset for path %s: %s", path, err) return train, nil, nil, nil, dim, err } test = v2.([][]float32) @@ -108,14 +108,14 @@ func Load(path string) (train, test, distances [][]float32, neighbors [][]int, d } distancesDim, v3, err := loadDataset(f, "distances", loadFloat32) if err != nil { - log.Error("couldn't load distances dataset for path %s: %s", path, err) + log.Errorf("couldn't load distances dataset for path %s: %s", path, err) return train, test, nil, nil, dim, err } distances = v3.([][]float32) neighborsDim, v4, err := loadDataset(f, "neighbors", loadInt) if err != nil { - log.Error("couldn't load neighbors dataset for path %s: %s", path, err) + log.Errorf("couldn't load neighbors dataset for path %s: %s", path, err) return train, test, distances, nil, trainDim, err } neighbors = v4.([][]int) diff --git a/pkg/tools/cli/loadtest/config/config.go b/pkg/tools/cli/loadtest/config/config.go index bbf9429449..96b6ae3698 100644 --- a/pkg/tools/cli/loadtest/config/config.go +++ b/pkg/tools/cli/loadtest/config/config.go @@ -26,37 +26,80 @@ import ( // GlobalConfig is type alias of config.GlobalConfig. type GlobalConfig = config.GlobalConfig -// Operation is type of implemented load test. +// Operation is operation type of implemented load test. type Operation uint8 -// Operation method. +// Operation method definition. const ( - Unknown Operation = iota + UnknownOperation Operation = iota Insert + BulkInsert + StreamInsert Search + StreamSearch ) -// OperationMethod convert string to Operation. +// OperationMethod converts string to Operation. func OperationMethod(s string) Operation { switch strings.ToLower(s) { case "insert": return Insert + case "bulkinsert": + return BulkInsert + case "streaminsert": + return StreamInsert case "search": return Search + case "streamsearch": + return StreamSearch default: - return Unknown + return UnknownOperation } } -// String convert Operation to string. +// String converts Operation to string. func (o Operation) String() string { switch o { case Insert: - return "insert" + return "Insert" case Search: - return "search" + return "Search" default: - return "unknown operation" + return "Unknown operation" + } +} + +// Service is service type of implemented load test. +type Service uint8 + +// Service definitions. +const ( + UnknownService Service = iota + Agent + Gateway +) + +// ServiceMethod converts string to Service. +func ServiceMethod(s string) Service { + switch strings.ToLower(s) { + case "agent": + return Agent + case "gateway": + return Gateway + default: + return UnknownService + } +} + +// String converts Service to string. +func (s Service) String() string { + switch s { + case Agent: + return "Agent" + case Gateway: + return "Gateway" + default: + return "Unknown service" } } @@ -65,9 +108,11 @@ func (o Operation) String() string { type Data struct { config.GlobalConfig `json:",inline" yaml:",inline"` Addr string `json:"addr" yaml:"addr"` - Method string `json:"method" yaml:"method"` + Service string `json:"service" yaml:"service"` + Operation string `json:"operation" yaml:"operation"` Dataset string `json:"dataset" yaml:"dataset"` Concurrency int `json:"concurrency" yaml:"concurrency"` + BatchSize int `json:"batch_size" yaml:"batch_size"` ProgressDuration string `json:"progress_duration" yaml:"progress_duration"` Client *config.GRPCClient `json:"client" yaml:"client"` } @@ -88,9 +133,10 @@ func NewConfig(path string) (cfg *Data, err error) { } cfg.Addr = config.GetActualValue(cfg.Addr) - cfg.Method = config.GetActualValue(cfg.Method) + cfg.Operation = config.GetActualValue(cfg.Operation) cfg.Dataset = config.GetActualValue(cfg.Dataset) cfg.ProgressDuration = config.GetActualValue(cfg.ProgressDuration) + cfg.Service = config.GetActualValue(cfg.Service) return cfg, nil } diff --git a/pkg/tools/cli/loadtest/service/insert.go b/pkg/tools/cli/loadtest/service/insert.go index 16175262a4..012186e351 100644 --- a/pkg/tools/cli/loadtest/service/insert.go +++ b/pkg/tools/cli/loadtest/service/insert.go @@ -17,28 +17,143 @@ package service import ( "context" + "sync" + "github.com/vdaas/vald/apis/grpc/agent/core" "github.com/vdaas/vald/apis/grpc/gateway/vald" "github.com/vdaas/vald/apis/grpc/payload" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" ) -func newInsert() (requestFunc, loaderFunc) { - return func(dataset assets.Dataset) ([]interface{}, error) { - vectors := dataset.Train() - ids := dataset.IDs() - requests := make([]interface{}, len(vectors)) - for j, v := range vectors { - requests[j] = &payload.Object_Vector{ - Id: ids[j], - Vector: v, - } +func insertRequestProvider(dataset assets.Dataset, batchSize int) (f func() interface{}, size int, err error) { + switch { + case batchSize == 1: + f, size = objectVectorProvider(dataset) + case batchSize >= 2: + f, size = objectVectorsProvider(dataset, batchSize) + default: + err = errors.New("batch size must be natural number.") + } + if err != nil { + return nil, 0, err + } + return f, size, nil +} + +func objectVectorProvider(dataset assets.Dataset) (func() interface{}, int) { + v := dataset.Train() + ids := dataset.IDs() + i := 0 + size := len(v) + m := &sync.Mutex{} + return func() (ret interface{}) { + m.Lock() + defer m.Unlock() + if i < size { + ret = &payload.Object_Vector{ + Id: ids[i], + Vector: v[i], + } + i++ + } + return ret + }, size +} + +func objectVectorsProvider(dataset assets.Dataset, n int) (func() interface{}, int) { + provider, s := objectVectorProvider(dataset) + size := s / n + if s % n != 0 { + size = size + 1 + } + return func() (ret interface{}) { + v := make([]*payload.Object_Vector, 0, n) + for i := 0; i < n; i++ { + d := provider() + if d == nil { + break } - return requests, nil - }, - func(ctx context.Context, c vald.ValdClient, i interface{}, copts ...grpc.CallOption) error { - _, err := c.Insert(ctx, i.(*payload.Object_Vector), copts...) - return err + v = append(v, d.(*payload.Object_Vector)) + } + return &payload.Object_Vectors{ + Vectors: v, + } + }, size +} + +type inserter interface { + Insert(context.Context, *payload.Object_Vector, ...grpc.CallOption) (*payload.Empty, error) + MultiInsert(context.Context, *payload.Object_Vectors, ...grpc.CallOption) (*payload.Empty, error) +} + +func agent(conn *grpc.ClientConn) inserter { + return core.NewAgentClient(conn) +} + +func gateway(conn *grpc.ClientConn) inserter { + return vald.NewValdClient(conn) +} + +func insert(c func(*grpc.ClientConn) inserter) loadFunc { + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return c(conn).Insert(ctx, i.(*payload.Object_Vector), copts...) + } +} + +func bulkInsert(c func(*grpc.ClientConn) inserter) loadFunc { + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return c(conn).MultiInsert(ctx, i.(*payload.Object_Vectors), copts...) + } +} + +func (l *loader) newInsert() (f loadFunc, err error) { + switch { + case l.batchSize == 1: + switch l.service { + case config.Agent: + f = insert(agent) + case config.Gateway: + f = insert(gateway) + default: + err = errors.Errorf("undefined service: %s", l.service.String()) + } + case l.batchSize >= 2: + switch l.service { + case config.Agent: + f = bulkInsert(agent) + case config.Gateway: + f = bulkInsert(gateway) + default: + err = errors.Errorf("undefined service: %s", l.service.String()) } + default: + err = errors.Errorf("batch size must be natural number.") + } + if err != nil { + return nil, err + } + return f, nil } + +func (l *loader) newStreamInsert() (f loadFunc, err error) { + l.batchSize = 1 + switch l.service { + case config.Agent: + f = func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return core.NewAgentClient(conn).StreamInsert(ctx, copts...) + } + case config.Gateway: + f = func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewValdClient(conn).StreamInsert(ctx, copts...) + } + default: + err = errors.Errorf("undefined service: %s", l.service.String()) + } + if err != nil { + return nil, err + } + return f, nil +} \ No newline at end of file diff --git a/pkg/tools/cli/loadtest/service/insert_test.go b/pkg/tools/cli/loadtest/service/insert_test.go index 6424701519..46f0cf1a1b 100644 --- a/pkg/tools/cli/loadtest/service/insert_test.go +++ b/pkg/tools/cli/loadtest/service/insert_test.go @@ -26,16 +26,16 @@ import ( func Test_newInsert(t *testing.T) { type want struct { want requestFunc - want1 loaderFunc + want1 loadFunc } type test struct { name string want want - checkFunc func(want, requestFunc, loaderFunc) error + checkFunc func(want, requestFunc, loadFunc) error beforeFunc func() afterFunc func() } - defaultCheckFunc := func(w want, got requestFunc, got1 loaderFunc) error { + defaultCheckFunc := func(w want, got requestFunc, got1 loadFunc) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want %v", got, w.want) } diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 842c181ddd..657c879b3b 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -23,7 +23,7 @@ import ( "syscall" "time" - "github.com/vdaas/vald/apis/grpc/gateway/vald" + "github.com/vdaas/vald/apis/grpc/payload" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" @@ -40,8 +40,7 @@ type Loader interface { } type ( - requestFunc func(assets.Dataset) ([]interface{}, error) - loaderFunc func(context.Context, vald.ValdClient, interface{}, ...grpc.CallOption) error + loadFunc func(context.Context, *grpc.ClientConn, interface{}, ...grpc.CallOption) (interface{}, error) ) type loader struct { @@ -49,11 +48,13 @@ type loader struct { client grpc.Client addr string concurrency int + batchSize int dataset string - requests []interface{} progressDuration time.Duration - requestsFunc requestFunc - loaderFunc loaderFunc + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service operation config.Operation } @@ -66,13 +67,21 @@ func NewLoader(opts ...Option) (Loader, error) { } } + var err error switch l.operation { case config.Insert: - l.requestsFunc, l.loaderFunc = newInsert() + l.loaderFunc, err = l.newInsert() + case config.StreamInsert: + l.loaderFunc, err = l.newStreamInsert() case config.Search: - l.requestsFunc, l.loaderFunc = newSearch() + l.loaderFunc, err = l.newSearch() + case config.StreamSearch: + l.loaderFunc, err = l.newStreamSearch() default: - return nil, errors.Errorf("undefined method: %v", l.operation) + err = errors.Errorf("undefined method: %s", l.operation.String()) + } + if err != nil { + return nil, err } return l, nil @@ -88,48 +97,64 @@ func (l *loader) Prepare(context.Context) (err error) { if err != nil { return err } - l.requests, err = l.requestsFunc(dataset) + + switch l.operation { + case config.Insert, config.StreamInsert: + l.dataProvider, l.dataSize, err = insertRequestProvider(dataset, l.batchSize) + case config.Search, config.StreamSearch: + l.dataProvider, l.dataSize, err = searchRequestProvider(dataset) + } if err != nil { return err } - if len(l.requests) == 0 { - return errors.New("prepare data is empty") - } - return nil } // Do operates load test. func (l *loader) Do(ctx context.Context) <-chan error { - ech := make(chan error, len(l.requests)) - - // TODO: related to #403. - p, err := os.FindProcess(os.Getpid()) - if err != nil { + ech := make(chan error, l.dataSize) + finalize := func(err error) { select { case <-ctx.Done(): ech <- errors.Wrap(err, ctx.Err().Error()) case ech <- err: } + } + + // TODO: related to #403. + p, err := os.FindProcess(os.Getpid()) + if err != nil { + finalize(err) return ech } - var pgCnt int32 = 0 - var start time.Time + var pgCnt, errCnt int32 = 0, 0 + var start, end time.Time + vps := func(count int, t1, t2 time.Time) float64 { + return float64(count) / t2.Sub(t1).Seconds() + } progress := func() { - log.Infof("progress %d items, %f[qps]", pgCnt, float64(pgCnt)/time.Now().Sub(start).Seconds()) + log.Infof("progress %d requests, %f[vps]", pgCnt, vps(int(pgCnt)*l.batchSize, start, time.Now())) } + + f := func(i interface{}, err error) { + atomic.AddInt32(&pgCnt, 1) + if err != nil { + atomic.AddInt32(&errCnt, 1) + } + } + + ticker := time.NewTicker(l.progressDuration) l.eg.Go(safety.RecoverFunc(func() error { - ticker := time.NewTicker(l.progressDuration) defer ticker.Stop() - for pgCnt != int32(len(l.requests)) { + for int(pgCnt) < l.dataSize { if err := func() error { - defer progress() select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: + progress() return nil } }(); err != nil { @@ -138,44 +163,78 @@ func (l *loader) Do(ctx context.Context) <-chan error { } return nil })) - start = time.Now() - var errCnt int32 = 0 + l.eg.Go(safety.RecoverFunc(func() error { + log.Infof("start load test(%s, %s)", l.service.String(), l.operation.String()) defer close(ech) eg, egctx := errgroup.New(ctx) - eg.Limitation(l.concurrency) - for _, req := range l.requests { - r := req - eg.Go(safety.RecoverFunc( - func() error { - _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - err := l.loaderFunc(ctx, vald.NewValdClient(conn), r, copts...) - atomic.AddInt32(&pgCnt, 1) - if err != nil { - log.Warn("an error occurred while executing loaderfunc:", err) - atomic.AddInt32(&errCnt, 1) - } + switch l.operation { + case config.StreamInsert, config.StreamSearch: + var newData func() interface{} + switch l.operation { + case config.StreamInsert: + newData = func() interface{} { + return new(payload.Empty) + } + case config.StreamSearch: + newData = func() interface{} { + return new(payload.Search_Response) + } + } + + start = time.Now() + eg.Go(safety.RecoverFunc(func() error { + _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + st, err := l.loaderFunc(ctx, conn, nil, copts...) + if err != nil { return nil, err + } + return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) + }) + if err != nil { + finalize(err) + } + return nil + })) + err = eg.Wait() + end = time.Now() + case config.Insert, config.Search: + eg.Limitation(l.concurrency) + start = time.Now() + for { + r := l.dataProvider() + if r == nil { + break + } + eg.Go(safety.RecoverFunc(func() error { + _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + res, err := l.loaderFunc(ctx, conn, r, copts...) + f(res, err) + return res, err }) if err != nil { - select { - case <-ctx.Done(): - ech <- errors.Wrap(err, ctx.Err().Error()) - case ech <- err: - } + finalize(err) } return nil })) - } - if err := eg.Wait(); err != nil { - select { - case <-ctx.Done(): - ech <- errors.Wrap(err, ctx.Err().Error()) - case ech <- err: } + err = eg.Wait() + end = time.Now() + default: + err = errors.Errorf("undefined type: %s", l.operation.String()) + } + ticker.Stop() + + if errCnt > 0 { + log.Warnf("Error ratio: %.2f%%", float64(errCnt)/float64(pgCnt)*100) + err = errors.Errorf("insert failure: %d", errCnt) + } + if err != nil { + finalize(err) return p.Signal(syscall.SIGKILL) // TODO: #403 } - log.Infof("Error ratio: %.2f%%", float64(errCnt)/float64(pgCnt)*100) + log.Infof("result:%s\t%d\t%d\t%f", l.service.String(), l.concurrency, l.batchSize, vps(int(pgCnt) * l.batchSize, start, end)) + return p.Signal(syscall.SIGTERM) // TODO: #403 })) return ech diff --git a/pkg/tools/cli/loadtest/service/loader_option.go b/pkg/tools/cli/loadtest/service/loader_option.go index 23072dc69c..77032d0ef4 100644 --- a/pkg/tools/cli/loadtest/service/loader_option.go +++ b/pkg/tools/cli/loadtest/service/loader_option.go @@ -29,6 +29,7 @@ type Option func(*loader) error var ( defaultOpts = []Option{ WithConcurrency(100), + WithBatchSize(1), WithErrGroup(errgroup.Get()), WithProgressDuration("5s"), } @@ -56,9 +57,21 @@ func WithClient(c grpc.Client) Option { // WithConcurrency sets load test concurrency. func WithConcurrency(c int) Option { return func(l *loader) error { - if c > 0 { - l.concurrency = c + if c <= 0 { + return errors.Errorf("concurrency must be natural number") } + l.concurrency = c + return nil + } +} + +// WithBatchSize sets load test batch size. +func WithBatchSize(b int) Option { + return func(l *loader) error { + if b <= 0 { + return errors.Errorf("batch size must be natural number") + } + l.batchSize = b return nil } } @@ -103,3 +116,11 @@ func WithOperation(op string) Option { return nil } } + +// WithService sets service of load test. +func WithService(s string) Option { + return func(l *loader) error { + l.service = config.ServiceMethod(s) + return nil + } +} diff --git a/pkg/tools/cli/loadtest/service/loader_test.go b/pkg/tools/cli/loadtest/service/loader_test.go index c0c66fd757..4e04385277 100644 --- a/pkg/tools/cli/loadtest/service/loader_test.go +++ b/pkg/tools/cli/loadtest/service/loader_test.go @@ -116,7 +116,7 @@ func Test_loader_Prepare(t *testing.T) { requests []interface{} progressDuration time.Duration requestsFunc requestFunc - loaderFunc loaderFunc + loaderFunc loadFunc operation config.Operation } type want struct { @@ -154,7 +154,7 @@ func Test_loader_Prepare(t *testing.T) { requests: nil, progressDuration: nil, requestsFunc: nil, - loaderFunc: nil, + loadFunc: nil, operation: nil, }, want: want{}, @@ -179,7 +179,7 @@ func Test_loader_Prepare(t *testing.T) { requests: nil, progressDuration: nil, requestsFunc: nil, - loaderFunc: nil, + loadFunc: nil, operation: nil, }, want: want{}, @@ -236,7 +236,7 @@ func Test_loader_Do(t *testing.T) { requests []interface{} progressDuration time.Duration requestsFunc requestFunc - loaderFunc loaderFunc + loaderFunc loadFunc operation config.Operation } type want struct { @@ -274,7 +274,7 @@ func Test_loader_Do(t *testing.T) { requests: nil, progressDuration: nil, requestsFunc: nil, - loaderFunc: nil, + loadFunc: nil, operation: nil, }, want: want{}, @@ -299,7 +299,7 @@ func Test_loader_Do(t *testing.T) { requests: nil, progressDuration: nil, requestsFunc: nil, - loaderFunc: nil, + loadFunc: nil, operation: nil, }, want: want{}, diff --git a/pkg/tools/cli/loadtest/service/search.go b/pkg/tools/cli/loadtest/service/search.go index e4f129b3fc..ad3b08a8e0 100644 --- a/pkg/tools/cli/loadtest/service/search.go +++ b/pkg/tools/cli/loadtest/service/search.go @@ -17,26 +17,63 @@ package service import ( "context" + "sync" + "github.com/vdaas/vald/apis/grpc/agent/core" "github.com/vdaas/vald/apis/grpc/gateway/vald" "github.com/vdaas/vald/apis/grpc/payload" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" ) -func newSearch() (requestFunc, loaderFunc) { - return func(dataset assets.Dataset) ([]interface{}, error) { - vectors := dataset.Query() - requests := make([]interface{}, len(vectors)) - for j, v := range vectors { - requests[j] = &payload.Search_Request{ - Vector: v, - } +func searchRequestProvider(dataset assets.Dataset) (func() interface{}, int, error) { + v := dataset.Query() + size := len(v) + i := 0 + m := sync.Mutex{} + return func() (ret interface{}) { + m.Lock() + defer m.Unlock() + if i < size { + ret = &payload.Search_Request{ + Vector: v[i], } - return requests, nil - }, - func(ctx context.Context, c vald.ValdClient, i interface{}, copts ...grpc.CallOption) error { - _, err := c.Search(ctx, i.(*payload.Search_Request), copts...) - return err + i++ } + return ret + }, size, nil } + + + +func (l *loader) newSearch() (loadFunc, error) { + switch l.service { + case config.Agent: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return core.NewAgentClient(conn).Search(ctx, i.(*payload.Search_Request), copts...) + }, nil + case config.Gateway: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewValdClient(conn).Search(ctx, i.(*payload.Search_Request), copts...) + }, nil + default: + return nil, errors.Errorf("undefined service: %s", l.service.String()) + } +} + +func (l *loader) newStreamSearch() (loadFunc, error) { + switch l.service { + case config.Agent: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return core.NewAgentClient(conn).StreamSearch(ctx, copts...) + }, nil + case config.Gateway: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewValdClient(conn).StreamSearch(ctx, copts...) + }, nil + default: + return nil, errors.Errorf("undefined service: %s", l.service.String()) + } +} \ No newline at end of file diff --git a/pkg/tools/cli/loadtest/service/search_test.go b/pkg/tools/cli/loadtest/service/search_test.go index 5504c779c2..a87b3b6046 100644 --- a/pkg/tools/cli/loadtest/service/search_test.go +++ b/pkg/tools/cli/loadtest/service/search_test.go @@ -26,16 +26,16 @@ import ( func Test_newSearch(t *testing.T) { type want struct { want requestFunc - want1 loaderFunc + want1 loadFunc } type test struct { name string want want - checkFunc func(want, requestFunc, loaderFunc) error + checkFunc func(want, requestFunc, loadFunc) error beforeFunc func() afterFunc func() } - defaultCheckFunc := func(w want, got requestFunc, got1 loaderFunc) error { + defaultCheckFunc := func(w want, got requestFunc, got1 loadFunc) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want %v", got, w.want) } diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index 7f78fb3d25..92a917c908 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -49,12 +49,14 @@ func New(cfg *config.Data) (r runner.Runner, err error) { run.client = grpc.New(clientOpts...) run.loader, err = service.NewLoader( - service.WithOperation(cfg.Method), + service.WithOperation(cfg.Operation), service.WithAddr(cfg.Addr), + service.WithBatchSize(cfg.BatchSize), service.WithDataset(cfg.Dataset), service.WithClient(run.client), service.WithConcurrency(cfg.Concurrency), service.WithProgressDuration(cfg.ProgressDuration), + service.WithService(cfg.Service), ) if err != nil { return nil, err @@ -74,7 +76,9 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { if err != nil { return nil, err } + lech := r.loader.Do(ctx) + ech := make(chan error, 1000) // TODO: fix magic number r.eg.Go(safety.RecoverFunc(func() (err error) { defer close(ech) From 25095f66ca80bb027c9429bba63399a0fe02988d Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Mon, 6 Jul 2020 16:44:04 +0900 Subject: [PATCH 02/14] fix format Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/service/insert.go | 6 +++--- pkg/tools/cli/loadtest/service/loader.go | 2 +- pkg/tools/cli/loadtest/service/search.go | 4 +--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/insert.go b/pkg/tools/cli/loadtest/service/insert.go index 012186e351..fb70bd0ab6 100644 --- a/pkg/tools/cli/loadtest/service/insert.go +++ b/pkg/tools/cli/loadtest/service/insert.go @@ -66,7 +66,7 @@ func objectVectorProvider(dataset assets.Dataset) (func() interface{}, int) { func objectVectorsProvider(dataset assets.Dataset, n int) (func() interface{}, int) { provider, s := objectVectorProvider(dataset) size := s / n - if s % n != 0 { + if s%n != 0 { size = size + 1 } return func() (ret interface{}) { @@ -138,7 +138,7 @@ func (l *loader) newInsert() (f loadFunc, err error) { return f, nil } -func (l *loader) newStreamInsert() (f loadFunc, err error) { +func (l *loader) newStreamInsert() (f loadFunc, err error) { l.batchSize = 1 switch l.service { case config.Agent: @@ -156,4 +156,4 @@ func (l *loader) newStreamInsert() (f loadFunc, err error) { return nil, err } return f, nil -} \ No newline at end of file +} diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 657c879b3b..3dc5f78234 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -233,7 +233,7 @@ func (l *loader) Do(ctx context.Context) <-chan error { finalize(err) return p.Signal(syscall.SIGKILL) // TODO: #403 } - log.Infof("result:%s\t%d\t%d\t%f", l.service.String(), l.concurrency, l.batchSize, vps(int(pgCnt) * l.batchSize, start, end)) + log.Infof("result:%s\t%d\t%d\t%f", l.service.String(), l.concurrency, l.batchSize, vps(int(pgCnt)*l.batchSize, start, end)) return p.Signal(syscall.SIGTERM) // TODO: #403 })) diff --git a/pkg/tools/cli/loadtest/service/search.go b/pkg/tools/cli/loadtest/service/search.go index ad3b08a8e0..53f1f9bedc 100644 --- a/pkg/tools/cli/loadtest/service/search.go +++ b/pkg/tools/cli/loadtest/service/search.go @@ -46,8 +46,6 @@ func searchRequestProvider(dataset assets.Dataset) (func() interface{}, int, err }, size, nil } - - func (l *loader) newSearch() (loadFunc, error) { switch l.service { case config.Agent: @@ -76,4 +74,4 @@ func (l *loader) newStreamSearch() (loadFunc, error) { default: return nil, errors.Errorf("undefined service: %s", l.service.String()) } -} \ No newline at end of file +} From 76735d0a56396fb4ddf91da2ecd1b9f072bd020d Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Wed, 8 Jul 2020 11:10:08 +0900 Subject: [PATCH 03/14] use raw grpc client Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/config/config.go | 4 +++ pkg/tools/cli/loadtest/service/insert.go | 3 ++ pkg/tools/cli/loadtest/service/loader.go | 44 ++++++++++++++++++++++-- pkg/tools/cli/loadtest/usecase/load.go | 5 ++- 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/pkg/tools/cli/loadtest/config/config.go b/pkg/tools/cli/loadtest/config/config.go index 96b6ae3698..1ff7e6bea0 100644 --- a/pkg/tools/cli/loadtest/config/config.go +++ b/pkg/tools/cli/loadtest/config/config.go @@ -62,8 +62,12 @@ func (o Operation) String() string { switch o { case Insert: return "Insert" + case StreamInsert: + return "StreamInsert" case Search: return "Search" + case StreamSearch: + return "StreamSearch" default: return "Unknown operation" } diff --git a/pkg/tools/cli/loadtest/service/insert.go b/pkg/tools/cli/loadtest/service/insert.go index fb70bd0ab6..de0a85344f 100644 --- a/pkg/tools/cli/loadtest/service/insert.go +++ b/pkg/tools/cli/loadtest/service/insert.go @@ -78,6 +78,9 @@ func objectVectorsProvider(dataset assets.Dataset, n int) (func() interface{}, i } v = append(v, d.(*payload.Object_Vector)) } + if len(v) == 0 { + return nil + } return &payload.Object_Vectors{ Vectors: v, } diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 3dc5f78234..bc7fd7b19c 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -27,10 +27,12 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc" + igrpc "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" + + "google.golang.org/grpc" // TODO: related to #557 ) // Loader is representation of load test @@ -45,7 +47,7 @@ type ( type loader struct { eg errgroup.Group - client grpc.Client + client igrpc.Client addr string concurrency int batchSize int @@ -135,7 +137,7 @@ func (l *loader) Do(ctx context.Context) <-chan error { return float64(count) / t2.Sub(t1).Seconds() } progress := func() { - log.Infof("progress %d requests, %f[vps]", pgCnt, vps(int(pgCnt)*l.batchSize, start, time.Now())) + log.Infof("progress %d requests, %f[vps], error: %d", pgCnt, vps(int(pgCnt)*l.batchSize, start, time.Now()), errCnt) } f := func(i interface{}, err error) { @@ -184,6 +186,8 @@ func (l *loader) Do(ctx context.Context) <-chan error { start = time.Now() eg.Go(safety.RecoverFunc(func() error { + // TODO: related to #557 + /* _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { st, err := l.loaderFunc(ctx, conn, nil, copts...) if err != nil { @@ -191,6 +195,26 @@ func (l *loader) Do(ctx context.Context) <-chan error { } return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) }) + */ + eg.Go(safety.RecoverFunc(func() error { + conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) + if err != nil { + finalize(err) + return nil + } + defer func() { + finalize(conn.Close()) + }() + st, err := l.loaderFunc(egctx, conn, nil) + if err != nil { + finalize(err) + return nil + } + if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { + finalize(err) + } + return nil + })) if err != nil { finalize(err) } @@ -207,14 +231,28 @@ func (l *loader) Do(ctx context.Context) <-chan error { break } eg.Go(safety.RecoverFunc(func() error { + // TODO: related to #557 + /* _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { res, err := l.loaderFunc(ctx, conn, r, copts...) f(res, err) return res, err }) + */ + conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) if err != nil { finalize(err) + return nil } + defer func() { + finalize(conn.Close()) + }() + res, err := l.loaderFunc(ctx, conn, r) + f(res, err) + if err != nil { + finalize(err) + } + return nil })) } diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index 92a917c908..dc36d9a2a5 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -72,10 +72,13 @@ func (r *run) PreStart(ctx context.Context) (err error) { // Start runs load test and returns error if occurred. func (r *run) Start(ctx context.Context) (<-chan error, error) { + // TODO: related to #557 + /* rech, err := r.client.StartConnectionMonitor(ctx) if err != nil { return nil, err } + */ lech := r.loader.Do(ctx) @@ -100,7 +103,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { select { case <-ctx.Done(): return finalize() - case err = <-rech: + //case err = <-rech: // TODO: related to #557 case err = <-lech: } if err != nil { From 095081d86bd9571eda9b205c876c0e397cad270f Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Wed, 8 Jul 2020 11:12:04 +0900 Subject: [PATCH 04/14] gofmt Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/service/loader.go | 28 ++++++++++++------------ pkg/tools/cli/loadtest/usecase/load.go | 10 ++++----- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index bc7fd7b19c..b0db2482ba 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -188,14 +188,14 @@ func (l *loader) Do(ctx context.Context) <-chan error { eg.Go(safety.RecoverFunc(func() error { // TODO: related to #557 /* - _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - st, err := l.loaderFunc(ctx, conn, nil, copts...) - if err != nil { - return nil, err - } - return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) - }) - */ + _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + st, err := l.loaderFunc(ctx, conn, nil, copts...) + if err != nil { + return nil, err + } + return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) + }) + */ eg.Go(safety.RecoverFunc(func() error { conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) if err != nil { @@ -233,12 +233,12 @@ func (l *loader) Do(ctx context.Context) <-chan error { eg.Go(safety.RecoverFunc(func() error { // TODO: related to #557 /* - _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - res, err := l.loaderFunc(ctx, conn, r, copts...) - f(res, err) - return res, err - }) - */ + _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + res, err := l.loaderFunc(ctx, conn, r, copts...) + f(res, err) + return res, err + }) + */ conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) if err != nil { finalize(err) diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index dc36d9a2a5..a189c440b1 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -74,11 +74,11 @@ func (r *run) PreStart(ctx context.Context) (err error) { func (r *run) Start(ctx context.Context) (<-chan error, error) { // TODO: related to #557 /* - rech, err := r.client.StartConnectionMonitor(ctx) - if err != nil { - return nil, err - } - */ + rech, err := r.client.StartConnectionMonitor(ctx) + if err != nil { + return nil, err + } + */ lech := r.loader.Do(ctx) From 634063e27107f9f07178ee6e2173a6a22b21ece8 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Wed, 8 Jul 2020 14:50:11 +0900 Subject: [PATCH 05/14] bug fix Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/config/config.go | 3 --- pkg/tools/cli/loadtest/service/loader.go | 32 ++++++++++-------------- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/pkg/tools/cli/loadtest/config/config.go b/pkg/tools/cli/loadtest/config/config.go index 1ff7e6bea0..6b81a124a8 100644 --- a/pkg/tools/cli/loadtest/config/config.go +++ b/pkg/tools/cli/loadtest/config/config.go @@ -33,7 +33,6 @@ type Operation uint8 const ( UnknownOperation Operation = iota Insert - BulkInsert StreamInsert Search StreamSearch @@ -44,8 +43,6 @@ func OperationMethod(s string) Operation { switch strings.ToLower(s) { case "insert": return Insert - case "bulkinsert": - return BulkInsert case "streaminsert": return StreamInsert case "search": diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index b0db2482ba..7191a49d50 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -80,7 +80,7 @@ func NewLoader(opts ...Option) (Loader, error) { case config.StreamSearch: l.loaderFunc, err = l.newStreamSearch() default: - err = errors.Errorf("undefined method: %s", l.operation.String()) + err = errors.Errorf("undefined operation: %s", l.operation.String()) } if err != nil { return nil, err @@ -196,27 +196,21 @@ func (l *loader) Do(ctx context.Context) <-chan error { return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) }) */ - eg.Go(safety.RecoverFunc(func() error { - conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) - if err != nil { - finalize(err) - return nil - } - defer func() { - finalize(conn.Close()) - }() - st, err := l.loaderFunc(egctx, conn, nil) - if err != nil { - finalize(err) - return nil - } - if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { - finalize(err) - } + conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) + if err != nil { + finalize(err) return nil - })) + } + defer func() { + finalize(conn.Close()) + }() + st, err := l.loaderFunc(egctx, conn, nil) if err != nil { finalize(err) + return nil + } + if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { + finalize(err) } return nil })) From 17d769243f8e01400a92c634a41c207386573b9b Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Wed, 29 Jul 2020 15:10:19 +0900 Subject: [PATCH 06/14] fix based on comments Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/service/loader.go | 2 +- pkg/tools/cli/loadtest/service/search.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 7191a49d50..6728f98ed5 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -241,7 +241,7 @@ func (l *loader) Do(ctx context.Context) <-chan error { defer func() { finalize(conn.Close()) }() - res, err := l.loaderFunc(ctx, conn, r) + res, err := l.loaderFunc(egctx, conn, r) f(res, err) if err != nil { finalize(err) diff --git a/pkg/tools/cli/loadtest/service/search.go b/pkg/tools/cli/loadtest/service/search.go index 53f1f9bedc..7aa16caa2d 100644 --- a/pkg/tools/cli/loadtest/service/search.go +++ b/pkg/tools/cli/loadtest/service/search.go @@ -32,7 +32,7 @@ func searchRequestProvider(dataset assets.Dataset) (func() interface{}, int, err v := dataset.Query() size := len(v) i := 0 - m := sync.Mutex{} + m := &sync.Mutex{} return func() (ret interface{}) { m.Lock() defer m.Unlock() From 54908e944c5e4ae44ac5e7f504eaa8a6bde06342 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Thu, 30 Jul 2020 13:04:41 +0900 Subject: [PATCH 07/14] fix based on comments Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/assets/loader.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/pkg/tools/cli/loadtest/assets/loader.go b/pkg/tools/cli/loadtest/assets/loader.go index 74afcada8e..916f5b5d70 100644 --- a/pkg/tools/cli/loadtest/assets/loader.go +++ b/pkg/tools/cli/loadtest/assets/loader.go @@ -20,7 +20,6 @@ import ( "github.com/kpango/fuid" "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/log" "gonum.org/v1/hdf5" ) @@ -84,23 +83,20 @@ func loadDataset(file *hdf5.File, name string, f loaderFunc) (dim int, vec inter func Load(path string) (train, test, distances [][]float32, neighbors [][]int, dim int, err error) { f, err := hdf5.OpenFile(path, hdf5.F_ACC_RDONLY) if err != nil { - log.Errorf("couldn't open file %s: %s", path, err) - return nil, nil, nil, nil, 0, err + return nil, nil, nil, nil, 0, errors.Wrapf(err, "couldn't open file %s", path) } defer func() { err = f.Close() }() trainDim, v1, err := loadDataset(f, "train", loadFloat32) if err != nil { - log.Errorf("couldn't load train dataset for path %s: %s", path, err) - return nil, nil, nil, nil, 0, err + return nil, nil, nil, nil, 0, errors.Wrapf(err, "couldn't load train dataset for path %s", path) } train = v1.([][]float32) dim = trainDim testDim, v2, err := loadDataset(f, "test", loadFloat32) if err != nil { - log.Errorf("couldn't load test dataset for path %s: %s", path, err) - return train, nil, nil, nil, dim, err + return train, nil, nil, nil, dim, errors.Wrapf(err, "couldn't load test dataset for path %s", path) } test = v2.([][]float32) if dim != testDim { @@ -108,15 +104,13 @@ func Load(path string) (train, test, distances [][]float32, neighbors [][]int, d } distancesDim, v3, err := loadDataset(f, "distances", loadFloat32) if err != nil { - log.Errorf("couldn't load distances dataset for path %s: %s", path, err) - return train, test, nil, nil, dim, err + return train, test, nil, nil, dim, errors.Wrapf(err, "couldn't load distances dataset for path %s", path) } distances = v3.([][]float32) neighborsDim, v4, err := loadDataset(f, "neighbors", loadInt) if err != nil { - log.Errorf("couldn't load neighbors dataset for path %s: %s", path, err) - return train, test, distances, nil, trainDim, err + return train, test, distances, nil, trainDim, errors.Wrapf(err, "couldn't load neighbors dataset for path %s", path) } neighbors = v4.([][]int) if distancesDim != neighborsDim { From 52028eb75c4c9f2124a62b907ca11c3455c31b59 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Thu, 30 Jul 2020 17:23:48 +0900 Subject: [PATCH 08/14] fix based on comments Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/service/insert.go | 12 ++++-------- pkg/tools/cli/loadtest/service/loader_option.go | 4 ++-- pkg/tools/cli/loadtest/service/search.go | 10 +++------- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/insert.go b/pkg/tools/cli/loadtest/service/insert.go index de0a85344f..f876173533 100644 --- a/pkg/tools/cli/loadtest/service/insert.go +++ b/pkg/tools/cli/loadtest/service/insert.go @@ -17,7 +17,7 @@ package service import ( "context" - "sync" + "sync/atomic" "github.com/vdaas/vald/apis/grpc/agent/core" "github.com/vdaas/vald/apis/grpc/gateway/vald" @@ -46,18 +46,14 @@ func insertRequestProvider(dataset assets.Dataset, batchSize int) (f func() inte func objectVectorProvider(dataset assets.Dataset) (func() interface{}, int) { v := dataset.Train() ids := dataset.IDs() - i := 0 + idx := int32(-1) size := len(v) - m := &sync.Mutex{} return func() (ret interface{}) { - m.Lock() - defer m.Unlock() - if i < size { + if i := int(atomic.AddInt32(&idx, 1)); i < size { ret = &payload.Object_Vector{ Id: ids[i], Vector: v[i], } - i++ } return ret }, size @@ -133,7 +129,7 @@ func (l *loader) newInsert() (f loadFunc, err error) { err = errors.Errorf("undefined service: %s", l.service.String()) } default: - err = errors.Errorf("batch size must be natural number.") + err = errors.New("batch size must be natural number.") } if err != nil { return nil, err diff --git a/pkg/tools/cli/loadtest/service/loader_option.go b/pkg/tools/cli/loadtest/service/loader_option.go index 77032d0ef4..731c45ebb6 100644 --- a/pkg/tools/cli/loadtest/service/loader_option.go +++ b/pkg/tools/cli/loadtest/service/loader_option.go @@ -58,7 +58,7 @@ func WithClient(c grpc.Client) Option { func WithConcurrency(c int) Option { return func(l *loader) error { if c <= 0 { - return errors.Errorf("concurrency must be natural number") + return errors.New("concurrency must be natural number") } l.concurrency = c return nil @@ -69,7 +69,7 @@ func WithConcurrency(c int) Option { func WithBatchSize(b int) Option { return func(l *loader) error { if b <= 0 { - return errors.Errorf("batch size must be natural number") + return errors.New("batch size must be natural number") } l.batchSize = b return nil diff --git a/pkg/tools/cli/loadtest/service/search.go b/pkg/tools/cli/loadtest/service/search.go index 7aa16caa2d..b23bc5c7f3 100644 --- a/pkg/tools/cli/loadtest/service/search.go +++ b/pkg/tools/cli/loadtest/service/search.go @@ -17,7 +17,7 @@ package service import ( "context" - "sync" + "sync/atomic" "github.com/vdaas/vald/apis/grpc/agent/core" "github.com/vdaas/vald/apis/grpc/gateway/vald" @@ -31,16 +31,12 @@ import ( func searchRequestProvider(dataset assets.Dataset) (func() interface{}, int, error) { v := dataset.Query() size := len(v) - i := 0 - m := &sync.Mutex{} + idx := int32(-1) return func() (ret interface{}) { - m.Lock() - defer m.Unlock() - if i < size { + if i := int(atomic.AddInt32(&idx, 1)); i < size { ret = &payload.Search_Request{ Vector: v[i], } - i++ } return ret }, size, nil From db2b185abd9525238bd01f1184ad9ac9724833f5 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Fri, 31 Jul 2020 11:14:45 +0900 Subject: [PATCH 09/14] fix error check Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/usecase/load.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index a189c440b1..1f8136480b 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -94,7 +94,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { } } err = ctx.Err() - if err != nil && err != context.Canceled { + if err != nil && errors.Is(err, context.Canceled) { errs = errors.Wrap(errs, err.Error()) } return errs From 45936bda755320dde9f0de34c24f4f1c27991ab2 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Fri, 31 Jul 2020 11:16:42 +0900 Subject: [PATCH 10/14] fix error check Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/usecase/load.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index 1f8136480b..958a0b933d 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -94,7 +94,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { } } err = ctx.Err() - if err != nil && errors.Is(err, context.Canceled) { + if err != nil && !errors.Is(err, context.Canceled) { errs = errors.Wrap(errs, err.Error()) } return errs From 48f837520e7234dac9be178312931b8f544acc2c Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Tue, 4 Aug 2020 15:19:52 +0900 Subject: [PATCH 11/14] refactor based on comment Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/service/loader.go | 176 +++++++++++------------ 1 file changed, 84 insertions(+), 92 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 6728f98ed5..93a1cc1b30 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -17,13 +17,13 @@ package service import ( "context" + "github.com/vdaas/vald/apis/grpc/payload" "os" "reflect" "sync/atomic" "syscall" "time" - "github.com/vdaas/vald/apis/grpc/payload" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" @@ -116,7 +116,7 @@ func (l *loader) Prepare(context.Context) (err error) { // Do operates load test. func (l *loader) Do(ctx context.Context) <-chan error { ech := make(chan error, l.dataSize) - finalize := func(err error) { + finalize := func(ctx context.Context, err error) { select { case <-ctx.Done(): ech <- errors.Wrap(err, ctx.Err().Error()) @@ -127,7 +127,7 @@ func (l *loader) Do(ctx context.Context) <-chan error { // TODO: related to #403. p, err := os.FindProcess(os.Getpid()) if err != nil { - finalize(err) + finalize(ctx, err) return ech } @@ -169,100 +169,16 @@ func (l *loader) Do(ctx context.Context) <-chan error { l.eg.Go(safety.RecoverFunc(func() error { log.Infof("start load test(%s, %s)", l.service.String(), l.operation.String()) defer close(ech) - eg, egctx := errgroup.New(ctx) - switch l.operation { - case config.StreamInsert, config.StreamSearch: - var newData func() interface{} - switch l.operation { - case config.StreamInsert: - newData = func() interface{} { - return new(payload.Empty) - } - case config.StreamSearch: - newData = func() interface{} { - return new(payload.Search_Response) - } - } - - start = time.Now() - eg.Go(safety.RecoverFunc(func() error { - // TODO: related to #557 - /* - _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - st, err := l.loaderFunc(ctx, conn, nil, copts...) - if err != nil { - return nil, err - } - return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) - }) - */ - conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) - if err != nil { - finalize(err) - return nil - } - defer func() { - finalize(conn.Close()) - }() - st, err := l.loaderFunc(egctx, conn, nil) - if err != nil { - finalize(err) - return nil - } - if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { - finalize(err) - } - return nil - })) - err = eg.Wait() - end = time.Now() - case config.Insert, config.Search: - eg.Limitation(l.concurrency) - start = time.Now() - for { - r := l.dataProvider() - if r == nil { - break - } - eg.Go(safety.RecoverFunc(func() error { - // TODO: related to #557 - /* - _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - res, err := l.loaderFunc(ctx, conn, r, copts...) - f(res, err) - return res, err - }) - */ - conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) - if err != nil { - finalize(err) - return nil - } - defer func() { - finalize(conn.Close()) - }() - res, err := l.loaderFunc(egctx, conn, r) - f(res, err) - if err != nil { - finalize(err) - } - - return nil - })) - } - err = eg.Wait() - end = time.Now() - default: - err = errors.Errorf("undefined type: %s", l.operation.String()) - } - ticker.Stop() + defer ticker.Stop() + start = time.Now() + err := l.do(ctx, f, finalize) + end = time.Now() if errCnt > 0 { log.Warnf("Error ratio: %.2f%%", float64(errCnt)/float64(pgCnt)*100) - err = errors.Errorf("insert failure: %d", errCnt) } if err != nil { - finalize(err) + finalize(ctx, err) return p.Signal(syscall.SIGKILL) // TODO: #403 } log.Infof("result:%s\t%d\t%d\t%f", l.service.String(), l.concurrency, l.batchSize, vps(int(pgCnt)*l.batchSize, start, end)) @@ -271,3 +187,79 @@ func (l *loader) Do(ctx context.Context) <-chan error { })) return ech } + +func (l *loader) do(ctx context.Context, f func(res interface{}, err error), notify func(context.Context, error)) (err error) { + eg, egctx := errgroup.New(ctx) + + switch l.operation { + case config.StreamInsert, config.StreamSearch: + var newData func() interface{} + switch l.operation { + case config.StreamInsert: + newData = func() interface{} { + return new(payload.Empty) + } + case config.StreamSearch: + newData = func() interface{} { + return new(payload.Search_Response) + } + } + eg.Go(safety.RecoverFunc(func() error { + // TODO: related to #557 + /* + _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + st, err := l.loaderFunc(ctx, conn, nil, copts...) + if err != nil { + return nil, err + } + return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) + }) + */ + conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) + if err != nil { + notify(egctx, err) + return nil + } + defer notify(egctx, conn.Close()) + st, err := l.loaderFunc(egctx, conn, nil) + if err != nil { + notify(egctx, err) + return nil + } + if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { + notify(egctx, err) + } + return nil + })) + err = eg.Wait() + case config.Insert, config.Search: + eg.Limitation(l.concurrency) + + for { + r := l.dataProvider() + if r == nil { + break + } + + eg.Go(safety.RecoverFunc(func() error { + conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) + if err != nil { + notify(egctx, err) + return nil + } + defer notify(egctx, conn.Close()) + + res, err := l.loaderFunc(egctx, conn, r) + f(res, err) + if err != nil { + notify(egctx, err) + } + return nil + })) + } + err = eg.Wait() + default: + err = errors.Errorf("undefined type: %s", l.operation.String()) + } + return +} \ No newline at end of file From 028925c93fd6f81d74c04e0913f74d9481a1e186 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Wed, 5 Aug 2020 16:53:23 +0900 Subject: [PATCH 12/14] refactor Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/service/loader.go | 29 +++++++++++++++--------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 93a1cc1b30..8def4b71f0 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -188,7 +188,7 @@ func (l *loader) Do(ctx context.Context) <-chan error { return ech } -func (l *loader) do(ctx context.Context, f func(res interface{}, err error), notify func(context.Context, error)) (err error) { +func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func(context.Context, error)) (err error) { eg, egctx := errgroup.New(ctx) switch l.operation { @@ -204,7 +204,13 @@ func (l *loader) do(ctx context.Context, f func(res interface{}, err error), not return new(payload.Search_Response) } } - eg.Go(safety.RecoverFunc(func() error { + eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + if err != nil { + notify(egctx, err) + err = nil + } + }() // TODO: related to #557 /* _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { @@ -217,17 +223,15 @@ func (l *loader) do(ctx context.Context, f func(res interface{}, err error), not */ conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) if err != nil { - notify(egctx, err) - return nil + return err } defer notify(egctx, conn.Close()) st, err := l.loaderFunc(egctx, conn, nil) if err != nil { - notify(egctx, err) - return nil + return err } if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { - notify(egctx, err) + return err } return nil })) @@ -241,18 +245,21 @@ func (l *loader) do(ctx context.Context, f func(res interface{}, err error), not break } - eg.Go(safety.RecoverFunc(func() error { + eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + notify(egctx, err) + err = nil + }() conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) if err != nil { - notify(egctx, err) - return nil + return err } defer notify(egctx, conn.Close()) res, err := l.loaderFunc(egctx, conn, r) f(res, err) if err != nil { - notify(egctx, err) + return err } return nil })) From 360e3ff0f494632e9cb7fd0a4d4586c2170052f0 Mon Sep 17 00:00:00 2001 From: vdaas-ci Date: Thu, 6 Aug 2020 03:20:32 +0000 Subject: [PATCH 13/14] :robot: Update license headers / Format go codes and yaml files Signed-off-by: vdaas-ci --- pkg/tools/cli/loadtest/service/loader.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 8def4b71f0..51052557e8 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -17,13 +17,14 @@ package service import ( "context" - "github.com/vdaas/vald/apis/grpc/payload" "os" "reflect" "sync/atomic" "syscall" "time" + "github.com/vdaas/vald/apis/grpc/payload" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" @@ -269,4 +270,4 @@ func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func err = errors.Errorf("undefined type: %s", l.operation.String()) } return -} \ No newline at end of file +} From 0985a92616f44c2de10347af923d300fc3128003 Mon Sep 17 00:00:00 2001 From: Kosuke Morimoto Date: Thu, 6 Aug 2020 15:35:32 +0900 Subject: [PATCH 14/14] fix for ci Signed-off-by: Kosuke Morimoto --- pkg/tools/cli/loadtest/config/config_test.go | 133 ++++ pkg/tools/cli/loadtest/service/insert_test.go | 725 +++++++++++++++++- pkg/tools/cli/loadtest/service/loader.go | 1 - .../loadtest/service/loader_option_test.go | 144 ++++ pkg/tools/cli/loadtest/service/loader_test.go | 202 ++++- pkg/tools/cli/loadtest/service/search_test.go | 283 ++++++- 6 files changed, 1441 insertions(+), 47 deletions(-) diff --git a/pkg/tools/cli/loadtest/config/config_test.go b/pkg/tools/cli/loadtest/config/config_test.go index 4b2bea9526..eeb98b9a3d 100644 --- a/pkg/tools/cli/loadtest/config/config_test.go +++ b/pkg/tools/cli/loadtest/config/config_test.go @@ -232,3 +232,136 @@ func TestNewConfig(t *testing.T) { }) } } + +func TestServiceMethod(t *testing.T) { + type args struct { + s string + } + type want struct { + want Service + } + type test struct { + name string + args args + want want + checkFunc func(want, Service) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got Service) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := ServiceMethod(test.args.s) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func TestService_String(t *testing.T) { + type want struct { + want string + } + type test struct { + name string + s Service + want want + checkFunc func(want, string) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got string) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := test.s.String() + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/pkg/tools/cli/loadtest/service/insert_test.go b/pkg/tools/cli/loadtest/service/insert_test.go index 46f0cf1a1b..afe9fc5592 100644 --- a/pkg/tools/cli/loadtest/service/insert_test.go +++ b/pkg/tools/cli/loadtest/service/insert_test.go @@ -18,24 +18,193 @@ package service import ( "reflect" "testing" + "time" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" + + "google.golang.org/grpc" // TODO: related to #557 ) -func Test_newInsert(t *testing.T) { +func Test_insertRequestProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + batchSize int + } type want struct { - want requestFunc - want1 loadFunc + wantF func() interface{} + wantSize int + err error } type test struct { name string + args args want want - checkFunc func(want, requestFunc, loadFunc) error - beforeFunc func() - afterFunc func() + checkFunc func(want, func() interface{}, int, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, gotF func() interface{}, gotSize int, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(gotF, w.wantF) { + return errors.Errorf("got = %v, want %v", gotF, w.wantF) + } + if !reflect.DeepEqual(gotSize, w.wantSize) { + return errors.Errorf("got = %v, want %v", gotSize, w.wantSize) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + dataset: nil, + batchSize: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + batchSize: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + gotF, gotSize, err := insertRequestProvider(test.args.dataset, test.args.batchSize) + if err := test.checkFunc(test.want, gotF, gotSize, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_objectVectorProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + } + type want struct { + want func() interface{} + want1 int + } + type test struct { + name string + args args + want want + checkFunc func(want, func() interface{}, int) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got func() interface{}, got1 int) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + if !reflect.DeepEqual(got1, w.want1) { + return errors.Errorf("got = %v, want %v", got1, w.want1) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got, got1 := objectVectorProvider(test.args.dataset) + if err := test.checkFunc(test.want, got, got1); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_objectVectorsProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + n int + } + type want struct { + want func() interface{} + want1 int } - defaultCheckFunc := func(w want, got requestFunc, got1 loadFunc) error { + type test struct { + name string + args args + want want + checkFunc func(want, func() interface{}, int) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got func() interface{}, got1 int) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want %v", got, w.want) } @@ -49,6 +218,392 @@ func Test_newInsert(t *testing.T) { /* { name: "test_case_1", + args: args { + dataset: nil, + n: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + n: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got, got1 := objectVectorsProvider(test.args.dataset, test.args.n) + if err := test.checkFunc(test.want, got, got1); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_agent(t *testing.T) { + type args struct { + conn *grpc.ClientConn + } + type want struct { + want inserter + } + type test struct { + name string + args args + want want + checkFunc func(want, inserter) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got inserter) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := agent(test.args.conn) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_gateway(t *testing.T) { + type args struct { + conn *grpc.ClientConn + } + type want struct { + want inserter + } + type test struct { + name string + args args + want want + checkFunc func(want, inserter) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got inserter) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := gateway(test.args.conn) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_insert(t *testing.T) { + type args struct { + c func(*grpc.ClientConn) inserter + } + type want struct { + want loadFunc + } + type test struct { + name string + args args + want want + checkFunc func(want, loadFunc) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got loadFunc) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := insert(test.args.c) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_bulkInsert(t *testing.T) { + type args struct { + c func(*grpc.ClientConn) inserter + } + type want struct { + want loadFunc + } + type test struct { + name string + args args + want want + checkFunc func(want, loadFunc) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got loadFunc) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := bulkInsert(test.args.c) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newInsert(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + wantF loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, gotF loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(gotF, w.wantF) { + return errors.Errorf("got = %v, want %v", gotF, w.wantF) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, }, @@ -59,6 +614,20 @@ func Test_newInsert(t *testing.T) { func() test { return test { name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, } @@ -68,7 +637,7 @@ func Test_newInsert(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -78,9 +647,145 @@ func Test_newInsert(t *testing.T) { if test.checkFunc == nil { test.checkFunc = defaultCheckFunc } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } - got, got1 := newInsert() - if err := test.checkFunc(test.want, got, got1); err != nil { + gotF, err := l.newInsert() + if err := test.checkFunc(test.want, gotF, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newStreamInsert(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + wantF loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, gotF loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(gotF, w.wantF) { + return errors.Errorf("got = %v, want %v", gotF, w.wantF) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } + + gotF, err := l.newStreamInsert() + if err := test.checkFunc(test.want, gotF, err); err != nil { tt.Errorf("error = %v", err) } diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 51052557e8..a02de0964e 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -24,7 +24,6 @@ import ( "time" "github.com/vdaas/vald/apis/grpc/payload" - "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" diff --git a/pkg/tools/cli/loadtest/service/loader_option_test.go b/pkg/tools/cli/loadtest/service/loader_option_test.go index 21193791a6..16379ed168 100644 --- a/pkg/tools/cli/loadtest/service/loader_option_test.go +++ b/pkg/tools/cli/loadtest/service/loader_option_test.go @@ -16,9 +16,11 @@ package service import ( + "reflect" "testing" "github.com/vdaas/vald/internal/errgroup" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "go.uber.org/goleak" ) @@ -813,3 +815,145 @@ func TestWithOperation(t *testing.T) { }) } } + +func TestWithBatchSize(t *testing.T) { + type args struct { + b int + } + type want struct { + want Option + } + type test struct { + name string + args args + want want + checkFunc func(want, Option) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got Option) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + b: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + b: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := WithBatchSize(test.args.b) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func TestWithService(t *testing.T) { + type args struct { + s string + } + type want struct { + want Option + } + type test struct { + name string + args args + want want + checkFunc func(want, Option) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got Option) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := WithService(test.args.s) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/pkg/tools/cli/loadtest/service/loader_test.go b/pkg/tools/cli/loadtest/service/loader_test.go index 4e04385277..8f543bbc76 100644 --- a/pkg/tools/cli/loadtest/service/loader_test.go +++ b/pkg/tools/cli/loadtest/service/loader_test.go @@ -23,7 +23,7 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/net/grpc" + igrpc "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" ) @@ -83,7 +83,7 @@ func TestNewLoader(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -109,14 +109,16 @@ func Test_loader_Prepare(t *testing.T) { } type fields struct { eg errgroup.Group - client grpc.Client + client igrpc.Client addr string concurrency int + batchSize int dataset string - requests []interface{} progressDuration time.Duration - requestsFunc requestFunc loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service operation config.Operation } type want struct { @@ -150,11 +152,13 @@ func Test_loader_Prepare(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, - loadFunc: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -175,11 +179,13 @@ func Test_loader_Prepare(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, - loadFunc: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -191,7 +197,7 @@ func Test_loader_Prepare(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -206,11 +212,13 @@ func Test_loader_Prepare(t *testing.T) { client: test.fields.client, addr: test.fields.addr, concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, dataset: test.fields.dataset, - requests: test.fields.requests, progressDuration: test.fields.progressDuration, - requestsFunc: test.fields.requestsFunc, loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, operation: test.fields.operation, } @@ -229,14 +237,16 @@ func Test_loader_Do(t *testing.T) { } type fields struct { eg errgroup.Group - client grpc.Client + client igrpc.Client addr string concurrency int + batchSize int dataset string - requests []interface{} progressDuration time.Duration - requestsFunc requestFunc loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service operation config.Operation } type want struct { @@ -270,11 +280,13 @@ func Test_loader_Do(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, - loadFunc: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -295,11 +307,13 @@ func Test_loader_Do(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, - loadFunc: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -311,7 +325,7 @@ func Test_loader_Do(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -326,11 +340,13 @@ func Test_loader_Do(t *testing.T) { client: test.fields.client, addr: test.fields.addr, concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, dataset: test.fields.dataset, - requests: test.fields.requests, progressDuration: test.fields.progressDuration, - requestsFunc: test.fields.requestsFunc, loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, operation: test.fields.operation, } @@ -342,3 +358,137 @@ func Test_loader_Do(t *testing.T) { }) } } + +func Test_loader_do(t *testing.T) { + type args struct { + ctx context.Context + f func(interface{}, error) + notify func(context.Context, error) + } + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + f: nil, + notify: nil, + }, + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + f: nil, + notify: nil, + }, + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } + + err := l.do(test.args.ctx, test.args.f, test.args.notify) + if err := test.checkFunc(test.want, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/pkg/tools/cli/loadtest/service/search_test.go b/pkg/tools/cli/loadtest/service/search_test.go index a87b3b6046..8452381a99 100644 --- a/pkg/tools/cli/loadtest/service/search_test.go +++ b/pkg/tools/cli/loadtest/service/search_test.go @@ -18,24 +18,37 @@ package service import ( "reflect" "testing" + "time" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" ) -func Test_newSearch(t *testing.T) { +func Test_searchRequestProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + } type want struct { - want requestFunc - want1 loadFunc + want func() interface{} + want1 int + err error } type test struct { name string + args args want want - checkFunc func(want, requestFunc, loadFunc) error - beforeFunc func() - afterFunc func() + checkFunc func(want, func() interface{}, int, error) error + beforeFunc func(args) + afterFunc func(args) } - defaultCheckFunc := func(w want, got requestFunc, got1 loadFunc) error { + defaultCheckFunc := func(w want, got func() interface{}, got1 int, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want %v", got, w.want) } @@ -49,6 +62,106 @@ func Test_newSearch(t *testing.T) { /* { name: "test_case_1", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got, got1, err := searchRequestProvider(test.args.dataset) + if err := test.checkFunc(test.want, got, got1, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newSearch(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + want loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, }, @@ -59,6 +172,20 @@ func Test_newSearch(t *testing.T) { func() test { return test { name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, } @@ -68,7 +195,7 @@ func Test_newSearch(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -78,9 +205,145 @@ func Test_newSearch(t *testing.T) { if test.checkFunc == nil { test.checkFunc = defaultCheckFunc } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } + + got, err := l.newSearch() + if err := test.checkFunc(test.want, got, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newStreamSearch(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + want loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } - got, got1 := newSearch() - if err := test.checkFunc(test.want, got, got1); err != nil { + got, err := l.newStreamSearch() + if err := test.checkFunc(test.want, got, err); err != nil { tt.Errorf("error = %v", err) }