diff --git a/cmd/tools/cli/loadtest/sample.yaml b/cmd/tools/cli/loadtest/sample.yaml index 4675381147..509a61b327 100644 --- a/cmd/tools/cli/loadtest/sample.yaml +++ b/cmd/tools/cli/loadtest/sample.yaml @@ -19,12 +19,15 @@ version: v0.0.0 time_zone: JST logging: logger: glg - level: debug - format: json -method: insert + level: info + format: raw +service: agent +operation: insert dataset: fashion-mnist concurrency: 100 -addr: "localhost:8081" +batch_size: 10 +progress_duration: 3s +addr: "localhost:8082" client: addrs: [] health_check_duration: 1s diff --git a/pkg/tools/cli/loadtest/assets/loader.go b/pkg/tools/cli/loadtest/assets/loader.go index 9e3b154fa2..916f5b5d70 100644 --- a/pkg/tools/cli/loadtest/assets/loader.go +++ b/pkg/tools/cli/loadtest/assets/loader.go @@ -20,7 +20,6 @@ import ( "github.com/kpango/fuid" "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/log" "gonum.org/v1/hdf5" ) @@ -84,23 +83,20 @@ func loadDataset(file *hdf5.File, name string, f loaderFunc) (dim int, vec inter func Load(path string) (train, test, distances [][]float32, neighbors [][]int, dim int, err error) { f, err := hdf5.OpenFile(path, hdf5.F_ACC_RDONLY) if err != nil { - log.Errorf("couldn't open file %s: %s", path, err) - return nil, nil, nil, nil, 0, err + return nil, nil, nil, nil, 0, errors.Wrapf(err, "couldn't open file %s", path) } defer func() { err = f.Close() }() trainDim, v1, err := loadDataset(f, "train", loadFloat32) if err != nil { - log.Error("couldn't load train dataset for path %s: %s", path, err) - return nil, nil, nil, nil, 0, err + return nil, nil, nil, nil, 0, errors.Wrapf(err, "couldn't load train dataset for path %s", path) } train = v1.([][]float32) dim = trainDim testDim, v2, err := loadDataset(f, "test", loadFloat32) if err != nil { - log.Error("couldn't load test dataset for path %s: %s", path, err) - return train, nil, nil, nil, dim, err + return train, nil, nil, nil, dim, errors.Wrapf(err, "couldn't load test dataset for path %s", path) } test = v2.([][]float32) if dim != testDim { @@ -108,15 +104,13 @@ func Load(path string) (train, test, distances [][]float32, neighbors [][]int, d } distancesDim, v3, err := loadDataset(f, "distances", loadFloat32) if err != nil { - log.Error("couldn't load distances dataset for path %s: %s", path, err) - return train, test, nil, nil, dim, err + return train, test, nil, nil, dim, errors.Wrapf(err, "couldn't load distances dataset for path %s", path) } distances = v3.([][]float32) neighborsDim, v4, err := loadDataset(f, "neighbors", loadInt) if err != nil { - log.Error("couldn't load neighbors dataset for path %s: %s", path, err) - return train, test, distances, nil, trainDim, err + return train, test, distances, nil, trainDim, errors.Wrapf(err, "couldn't load neighbors dataset for path %s", path) } neighbors = v4.([][]int) if distancesDim != neighborsDim { diff --git a/pkg/tools/cli/loadtest/config/config.go b/pkg/tools/cli/loadtest/config/config.go index bbf9429449..6b81a124a8 100644 --- a/pkg/tools/cli/loadtest/config/config.go +++ b/pkg/tools/cli/loadtest/config/config.go @@ -26,37 +26,81 @@ import ( // GlobalConfig is type alias of config.GlobalConfig. type GlobalConfig = config.GlobalConfig -// Operation is type of implemented load test. +// Operation is operation type of implemented load test. type Operation uint8 -// Operation method. +// Operation method definition. const ( - Unknown Operation = iota + UnknownOperation Operation = iota Insert + StreamInsert Search + StreamSearch ) -// OperationMethod convert string to Operation. +// OperationMethod converts string to Operation. func OperationMethod(s string) Operation { switch strings.ToLower(s) { case "insert": return Insert + case "streaminsert": + return StreamInsert case "search": return Search + case "streamsearch": + return StreamSearch default: - return Unknown + return UnknownOperation } } -// String convert Operation to string. +// String converts Operation to string. func (o Operation) String() string { switch o { case Insert: - return "insert" + return "Insert" + case StreamInsert: + return "StreamInsert" case Search: - return "search" + return "Search" + case StreamSearch: + return "StreamSearch" default: - return "unknown operation" + return "Unknown operation" + } +} + +// Service is service type of implemented load test. +type Service uint8 + +// Service definitions. +const ( + UnknownService Service = iota + Agent + Gateway +) + +// ServiceMethod converts string to Service. +func ServiceMethod(s string) Service { + switch strings.ToLower(s) { + case "agent": + return Agent + case "gateway": + return Gateway + default: + return UnknownService + } +} + +// String converts Service to string. +func (s Service) String() string { + switch s { + case Agent: + return "Agent" + case Gateway: + return "Gateway" + default: + return "Unknown service" } } @@ -65,9 +109,11 @@ func (o Operation) String() string { type Data struct { config.GlobalConfig `json:",inline" yaml:",inline"` Addr string `json:"addr" yaml:"addr"` - Method string `json:"method" yaml:"method"` + Service string `json:"service" yaml:"service"` + Operation string `json:"operation" yaml:"operation"` Dataset string `json:"dataset" yaml:"dataset"` Concurrency int `json:"concurrency" yaml:"concurrency"` + BatchSize int `json:"batch_size" yaml:"batch_size"` ProgressDuration string `json:"progress_duration" yaml:"progress_duration"` Client *config.GRPCClient `json:"client" yaml:"client"` } @@ -88,9 +134,10 @@ func NewConfig(path string) (cfg *Data, err error) { } cfg.Addr = config.GetActualValue(cfg.Addr) - cfg.Method = config.GetActualValue(cfg.Method) + cfg.Operation = config.GetActualValue(cfg.Operation) cfg.Dataset = config.GetActualValue(cfg.Dataset) cfg.ProgressDuration = config.GetActualValue(cfg.ProgressDuration) + cfg.Service = config.GetActualValue(cfg.Service) return cfg, nil } diff --git a/pkg/tools/cli/loadtest/config/config_test.go b/pkg/tools/cli/loadtest/config/config_test.go index 4b2bea9526..eeb98b9a3d 100644 --- a/pkg/tools/cli/loadtest/config/config_test.go +++ b/pkg/tools/cli/loadtest/config/config_test.go @@ -232,3 +232,136 @@ func TestNewConfig(t *testing.T) { }) } } + +func TestServiceMethod(t *testing.T) { + type args struct { + s string + } + type want struct { + want Service + } + type test struct { + name string + args args + want want + checkFunc func(want, Service) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got Service) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := ServiceMethod(test.args.s) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func TestService_String(t *testing.T) { + type want struct { + want string + } + type test struct { + name string + s Service + want want + checkFunc func(want, string) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got string) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := test.s.String() + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/pkg/tools/cli/loadtest/service/insert.go b/pkg/tools/cli/loadtest/service/insert.go index 16175262a4..f876173533 100644 --- a/pkg/tools/cli/loadtest/service/insert.go +++ b/pkg/tools/cli/loadtest/service/insert.go @@ -17,28 +17,142 @@ package service import ( "context" + "sync/atomic" + "github.com/vdaas/vald/apis/grpc/agent/core" "github.com/vdaas/vald/apis/grpc/gateway/vald" "github.com/vdaas/vald/apis/grpc/payload" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" ) -func newInsert() (requestFunc, loaderFunc) { - return func(dataset assets.Dataset) ([]interface{}, error) { - vectors := dataset.Train() - ids := dataset.IDs() - requests := make([]interface{}, len(vectors)) - for j, v := range vectors { - requests[j] = &payload.Object_Vector{ - Id: ids[j], - Vector: v, - } +func insertRequestProvider(dataset assets.Dataset, batchSize int) (f func() interface{}, size int, err error) { + switch { + case batchSize == 1: + f, size = objectVectorProvider(dataset) + case batchSize >= 2: + f, size = objectVectorsProvider(dataset, batchSize) + default: + err = errors.New("batch size must be natural number.") + } + if err != nil { + return nil, 0, err + } + return f, size, nil +} + +func objectVectorProvider(dataset assets.Dataset) (func() interface{}, int) { + v := dataset.Train() + ids := dataset.IDs() + idx := int32(-1) + size := len(v) + return func() (ret interface{}) { + if i := int(atomic.AddInt32(&idx, 1)); i < size { + ret = &payload.Object_Vector{ + Id: ids[i], + Vector: v[i], } - return requests, nil - }, - func(ctx context.Context, c vald.ValdClient, i interface{}, copts ...grpc.CallOption) error { - _, err := c.Insert(ctx, i.(*payload.Object_Vector), copts...) - return err } + return ret + }, size +} + +func objectVectorsProvider(dataset assets.Dataset, n int) (func() interface{}, int) { + provider, s := objectVectorProvider(dataset) + size := s / n + if s%n != 0 { + size = size + 1 + } + return func() (ret interface{}) { + v := make([]*payload.Object_Vector, 0, n) + for i := 0; i < n; i++ { + d := provider() + if d == nil { + break + } + v = append(v, d.(*payload.Object_Vector)) + } + if len(v) == 0 { + return nil + } + return &payload.Object_Vectors{ + Vectors: v, + } + }, size +} + +type inserter interface { + Insert(context.Context, *payload.Object_Vector, ...grpc.CallOption) (*payload.Empty, error) + MultiInsert(context.Context, *payload.Object_Vectors, ...grpc.CallOption) (*payload.Empty, error) +} + +func agent(conn *grpc.ClientConn) inserter { + return core.NewAgentClient(conn) +} + +func gateway(conn *grpc.ClientConn) inserter { + return vald.NewValdClient(conn) +} + +func insert(c func(*grpc.ClientConn) inserter) loadFunc { + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return c(conn).Insert(ctx, i.(*payload.Object_Vector), copts...) + } +} + +func bulkInsert(c func(*grpc.ClientConn) inserter) loadFunc { + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return c(conn).MultiInsert(ctx, i.(*payload.Object_Vectors), copts...) + } +} + +func (l *loader) newInsert() (f loadFunc, err error) { + switch { + case l.batchSize == 1: + switch l.service { + case config.Agent: + f = insert(agent) + case config.Gateway: + f = insert(gateway) + default: + err = errors.Errorf("undefined service: %s", l.service.String()) + } + case l.batchSize >= 2: + switch l.service { + case config.Agent: + f = bulkInsert(agent) + case config.Gateway: + f = bulkInsert(gateway) + default: + err = errors.Errorf("undefined service: %s", l.service.String()) + } + default: + err = errors.New("batch size must be natural number.") + } + if err != nil { + return nil, err + } + return f, nil +} + +func (l *loader) newStreamInsert() (f loadFunc, err error) { + l.batchSize = 1 + switch l.service { + case config.Agent: + f = func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return core.NewAgentClient(conn).StreamInsert(ctx, copts...) + } + case config.Gateway: + f = func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewValdClient(conn).StreamInsert(ctx, copts...) + } + default: + err = errors.Errorf("undefined service: %s", l.service.String()) + } + if err != nil { + return nil, err + } + return f, nil } diff --git a/pkg/tools/cli/loadtest/service/insert_test.go b/pkg/tools/cli/loadtest/service/insert_test.go index 6424701519..afe9fc5592 100644 --- a/pkg/tools/cli/loadtest/service/insert_test.go +++ b/pkg/tools/cli/loadtest/service/insert_test.go @@ -18,24 +18,193 @@ package service import ( "reflect" "testing" + "time" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" + + "google.golang.org/grpc" // TODO: related to #557 ) -func Test_newInsert(t *testing.T) { +func Test_insertRequestProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + batchSize int + } type want struct { - want requestFunc - want1 loaderFunc + wantF func() interface{} + wantSize int + err error } type test struct { name string + args args want want - checkFunc func(want, requestFunc, loaderFunc) error - beforeFunc func() - afterFunc func() + checkFunc func(want, func() interface{}, int, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, gotF func() interface{}, gotSize int, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(gotF, w.wantF) { + return errors.Errorf("got = %v, want %v", gotF, w.wantF) + } + if !reflect.DeepEqual(gotSize, w.wantSize) { + return errors.Errorf("got = %v, want %v", gotSize, w.wantSize) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + dataset: nil, + batchSize: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + batchSize: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + gotF, gotSize, err := insertRequestProvider(test.args.dataset, test.args.batchSize) + if err := test.checkFunc(test.want, gotF, gotSize, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_objectVectorProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + } + type want struct { + want func() interface{} + want1 int + } + type test struct { + name string + args args + want want + checkFunc func(want, func() interface{}, int) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got func() interface{}, got1 int) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + if !reflect.DeepEqual(got1, w.want1) { + return errors.Errorf("got = %v, want %v", got1, w.want1) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got, got1 := objectVectorProvider(test.args.dataset) + if err := test.checkFunc(test.want, got, got1); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_objectVectorsProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + n int + } + type want struct { + want func() interface{} + want1 int } - defaultCheckFunc := func(w want, got requestFunc, got1 loaderFunc) error { + type test struct { + name string + args args + want want + checkFunc func(want, func() interface{}, int) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got func() interface{}, got1 int) error { if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want %v", got, w.want) } @@ -49,6 +218,392 @@ func Test_newInsert(t *testing.T) { /* { name: "test_case_1", + args: args { + dataset: nil, + n: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + n: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got, got1 := objectVectorsProvider(test.args.dataset, test.args.n) + if err := test.checkFunc(test.want, got, got1); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_agent(t *testing.T) { + type args struct { + conn *grpc.ClientConn + } + type want struct { + want inserter + } + type test struct { + name string + args args + want want + checkFunc func(want, inserter) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got inserter) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := agent(test.args.conn) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_gateway(t *testing.T) { + type args struct { + conn *grpc.ClientConn + } + type want struct { + want inserter + } + type test struct { + name string + args args + want want + checkFunc func(want, inserter) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got inserter) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + conn: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := gateway(test.args.conn) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_insert(t *testing.T) { + type args struct { + c func(*grpc.ClientConn) inserter + } + type want struct { + want loadFunc + } + type test struct { + name string + args args + want want + checkFunc func(want, loadFunc) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got loadFunc) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := insert(test.args.c) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_bulkInsert(t *testing.T) { + type args struct { + c func(*grpc.ClientConn) inserter + } + type want struct { + want loadFunc + } + type test struct { + name string + args args + want want + checkFunc func(want, loadFunc) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got loadFunc) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + c: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := bulkInsert(test.args.c) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newInsert(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + wantF loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, gotF loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(gotF, w.wantF) { + return errors.Errorf("got = %v, want %v", gotF, w.wantF) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, }, @@ -59,6 +614,20 @@ func Test_newInsert(t *testing.T) { func() test { return test { name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, } @@ -68,7 +637,7 @@ func Test_newInsert(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -78,9 +647,145 @@ func Test_newInsert(t *testing.T) { if test.checkFunc == nil { test.checkFunc = defaultCheckFunc } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } - got, got1 := newInsert() - if err := test.checkFunc(test.want, got, got1); err != nil { + gotF, err := l.newInsert() + if err := test.checkFunc(test.want, gotF, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newStreamInsert(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + wantF loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, gotF loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(gotF, w.wantF) { + return errors.Errorf("got = %v, want %v", gotF, w.wantF) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } + + gotF, err := l.newStreamInsert() + if err := test.checkFunc(test.want, gotF, err); err != nil { tt.Errorf("error = %v", err) } diff --git a/pkg/tools/cli/loadtest/service/loader.go b/pkg/tools/cli/loadtest/service/loader.go index 842c181ddd..a02de0964e 100644 --- a/pkg/tools/cli/loadtest/service/loader.go +++ b/pkg/tools/cli/loadtest/service/loader.go @@ -23,14 +23,16 @@ import ( "syscall" "time" - "github.com/vdaas/vald/apis/grpc/gateway/vald" + "github.com/vdaas/vald/apis/grpc/payload" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/net/grpc" + igrpc "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" + + "google.golang.org/grpc" // TODO: related to #557 ) // Loader is representation of load test @@ -40,20 +42,21 @@ type Loader interface { } type ( - requestFunc func(assets.Dataset) ([]interface{}, error) - loaderFunc func(context.Context, vald.ValdClient, interface{}, ...grpc.CallOption) error + loadFunc func(context.Context, *grpc.ClientConn, interface{}, ...grpc.CallOption) (interface{}, error) ) type loader struct { eg errgroup.Group - client grpc.Client + client igrpc.Client addr string concurrency int + batchSize int dataset string - requests []interface{} progressDuration time.Duration - requestsFunc requestFunc - loaderFunc loaderFunc + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service operation config.Operation } @@ -66,13 +69,21 @@ func NewLoader(opts ...Option) (Loader, error) { } } + var err error switch l.operation { case config.Insert: - l.requestsFunc, l.loaderFunc = newInsert() + l.loaderFunc, err = l.newInsert() + case config.StreamInsert: + l.loaderFunc, err = l.newStreamInsert() case config.Search: - l.requestsFunc, l.loaderFunc = newSearch() + l.loaderFunc, err = l.newSearch() + case config.StreamSearch: + l.loaderFunc, err = l.newStreamSearch() default: - return nil, errors.Errorf("undefined method: %v", l.operation) + err = errors.Errorf("undefined operation: %s", l.operation.String()) + } + if err != nil { + return nil, err } return l, nil @@ -88,48 +99,64 @@ func (l *loader) Prepare(context.Context) (err error) { if err != nil { return err } - l.requests, err = l.requestsFunc(dataset) + + switch l.operation { + case config.Insert, config.StreamInsert: + l.dataProvider, l.dataSize, err = insertRequestProvider(dataset, l.batchSize) + case config.Search, config.StreamSearch: + l.dataProvider, l.dataSize, err = searchRequestProvider(dataset) + } if err != nil { return err } - if len(l.requests) == 0 { - return errors.New("prepare data is empty") - } - return nil } // Do operates load test. func (l *loader) Do(ctx context.Context) <-chan error { - ech := make(chan error, len(l.requests)) - - // TODO: related to #403. - p, err := os.FindProcess(os.Getpid()) - if err != nil { + ech := make(chan error, l.dataSize) + finalize := func(ctx context.Context, err error) { select { case <-ctx.Done(): ech <- errors.Wrap(err, ctx.Err().Error()) case ech <- err: } + } + + // TODO: related to #403. + p, err := os.FindProcess(os.Getpid()) + if err != nil { + finalize(ctx, err) return ech } - var pgCnt int32 = 0 - var start time.Time + var pgCnt, errCnt int32 = 0, 0 + var start, end time.Time + vps := func(count int, t1, t2 time.Time) float64 { + return float64(count) / t2.Sub(t1).Seconds() + } progress := func() { - log.Infof("progress %d items, %f[qps]", pgCnt, float64(pgCnt)/time.Now().Sub(start).Seconds()) + log.Infof("progress %d requests, %f[vps], error: %d", pgCnt, vps(int(pgCnt)*l.batchSize, start, time.Now()), errCnt) + } + + f := func(i interface{}, err error) { + atomic.AddInt32(&pgCnt, 1) + if err != nil { + atomic.AddInt32(&errCnt, 1) + } } + + ticker := time.NewTicker(l.progressDuration) l.eg.Go(safety.RecoverFunc(func() error { - ticker := time.NewTicker(l.progressDuration) defer ticker.Stop() - for pgCnt != int32(len(l.requests)) { + for int(pgCnt) < l.dataSize { if err := func() error { - defer progress() select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: + progress() return nil } }(); err != nil { @@ -138,45 +165,108 @@ func (l *loader) Do(ctx context.Context) <-chan error { } return nil })) - start = time.Now() - var errCnt int32 = 0 + l.eg.Go(safety.RecoverFunc(func() error { + log.Infof("start load test(%s, %s)", l.service.String(), l.operation.String()) defer close(ech) - eg, egctx := errgroup.New(ctx) - eg.Limitation(l.concurrency) - for _, req := range l.requests { - r := req - eg.Go(safety.RecoverFunc( - func() error { - _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { - err := l.loaderFunc(ctx, vald.NewValdClient(conn), r, copts...) - atomic.AddInt32(&pgCnt, 1) - if err != nil { - log.Warn("an error occurred while executing loaderfunc:", err) - atomic.AddInt32(&errCnt, 1) - } - return nil, err - }) - if err != nil { - select { - case <-ctx.Done(): - ech <- errors.Wrap(err, ctx.Err().Error()) - case ech <- err: - } - } - return nil - })) + defer ticker.Stop() + start = time.Now() + err := l.do(ctx, f, finalize) + end = time.Now() + + if errCnt > 0 { + log.Warnf("Error ratio: %.2f%%", float64(errCnt)/float64(pgCnt)*100) } - if err := eg.Wait(); err != nil { - select { - case <-ctx.Done(): - ech <- errors.Wrap(err, ctx.Err().Error()) - case ech <- err: - } + if err != nil { + finalize(ctx, err) return p.Signal(syscall.SIGKILL) // TODO: #403 } - log.Infof("Error ratio: %.2f%%", float64(errCnt)/float64(pgCnt)*100) + log.Infof("result:%s\t%d\t%d\t%f", l.service.String(), l.concurrency, l.batchSize, vps(int(pgCnt)*l.batchSize, start, end)) + return p.Signal(syscall.SIGTERM) // TODO: #403 })) return ech } + +func (l *loader) do(ctx context.Context, f func(interface{}, error), notify func(context.Context, error)) (err error) { + eg, egctx := errgroup.New(ctx) + + switch l.operation { + case config.StreamInsert, config.StreamSearch: + var newData func() interface{} + switch l.operation { + case config.StreamInsert: + newData = func() interface{} { + return new(payload.Empty) + } + case config.StreamSearch: + newData = func() interface{} { + return new(payload.Search_Response) + } + } + eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + if err != nil { + notify(egctx, err) + err = nil + } + }() + // TODO: related to #557 + /* + _, err := l.client.Do(egctx, l.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) { + st, err := l.loaderFunc(ctx, conn, nil, copts...) + if err != nil { + return nil, err + } + return nil, grpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f) + }) + */ + conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) + if err != nil { + return err + } + defer notify(egctx, conn.Close()) + st, err := l.loaderFunc(egctx, conn, nil) + if err != nil { + return err + } + if err := igrpc.BidirectionalStreamClient(st.(grpc.ClientStream), l.dataProvider, newData, f); err != nil { + return err + } + return nil + })) + err = eg.Wait() + case config.Insert, config.Search: + eg.Limitation(l.concurrency) + + for { + r := l.dataProvider() + if r == nil { + break + } + + eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + notify(egctx, err) + err = nil + }() + conn, err := grpc.Dial(l.addr, grpc.WithInsecure()) + if err != nil { + return err + } + defer notify(egctx, conn.Close()) + + res, err := l.loaderFunc(egctx, conn, r) + f(res, err) + if err != nil { + return err + } + return nil + })) + } + err = eg.Wait() + default: + err = errors.Errorf("undefined type: %s", l.operation.String()) + } + return +} diff --git a/pkg/tools/cli/loadtest/service/loader_option.go b/pkg/tools/cli/loadtest/service/loader_option.go index 23072dc69c..731c45ebb6 100644 --- a/pkg/tools/cli/loadtest/service/loader_option.go +++ b/pkg/tools/cli/loadtest/service/loader_option.go @@ -29,6 +29,7 @@ type Option func(*loader) error var ( defaultOpts = []Option{ WithConcurrency(100), + WithBatchSize(1), WithErrGroup(errgroup.Get()), WithProgressDuration("5s"), } @@ -56,9 +57,21 @@ func WithClient(c grpc.Client) Option { // WithConcurrency sets load test concurrency. func WithConcurrency(c int) Option { return func(l *loader) error { - if c > 0 { - l.concurrency = c + if c <= 0 { + return errors.New("concurrency must be natural number") } + l.concurrency = c + return nil + } +} + +// WithBatchSize sets load test batch size. +func WithBatchSize(b int) Option { + return func(l *loader) error { + if b <= 0 { + return errors.New("batch size must be natural number") + } + l.batchSize = b return nil } } @@ -103,3 +116,11 @@ func WithOperation(op string) Option { return nil } } + +// WithService sets service of load test. +func WithService(s string) Option { + return func(l *loader) error { + l.service = config.ServiceMethod(s) + return nil + } +} diff --git a/pkg/tools/cli/loadtest/service/loader_option_test.go b/pkg/tools/cli/loadtest/service/loader_option_test.go index 21193791a6..16379ed168 100644 --- a/pkg/tools/cli/loadtest/service/loader_option_test.go +++ b/pkg/tools/cli/loadtest/service/loader_option_test.go @@ -16,9 +16,11 @@ package service import ( + "reflect" "testing" "github.com/vdaas/vald/internal/errgroup" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "go.uber.org/goleak" ) @@ -813,3 +815,145 @@ func TestWithOperation(t *testing.T) { }) } } + +func TestWithBatchSize(t *testing.T) { + type args struct { + b int + } + type want struct { + want Option + } + type test struct { + name string + args args + want want + checkFunc func(want, Option) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got Option) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + b: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + b: 0, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := WithBatchSize(test.args.b) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func TestWithService(t *testing.T) { + type args struct { + s string + } + type want struct { + want Option + } + type test struct { + name string + args args + want want + checkFunc func(want, Option) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, got Option) error { + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + s: "", + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got := WithService(test.args.s) + if err := test.checkFunc(test.want, got); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/pkg/tools/cli/loadtest/service/loader_test.go b/pkg/tools/cli/loadtest/service/loader_test.go index c0c66fd757..8f543bbc76 100644 --- a/pkg/tools/cli/loadtest/service/loader_test.go +++ b/pkg/tools/cli/loadtest/service/loader_test.go @@ -23,7 +23,7 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/net/grpc" + igrpc "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" ) @@ -83,7 +83,7 @@ func TestNewLoader(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -109,14 +109,16 @@ func Test_loader_Prepare(t *testing.T) { } type fields struct { eg errgroup.Group - client grpc.Client + client igrpc.Client addr string concurrency int + batchSize int dataset string - requests []interface{} progressDuration time.Duration - requestsFunc requestFunc - loaderFunc loaderFunc + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service operation config.Operation } type want struct { @@ -150,11 +152,13 @@ func Test_loader_Prepare(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -175,11 +179,13 @@ func Test_loader_Prepare(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -191,7 +197,7 @@ func Test_loader_Prepare(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -206,11 +212,13 @@ func Test_loader_Prepare(t *testing.T) { client: test.fields.client, addr: test.fields.addr, concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, dataset: test.fields.dataset, - requests: test.fields.requests, progressDuration: test.fields.progressDuration, - requestsFunc: test.fields.requestsFunc, loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, operation: test.fields.operation, } @@ -229,14 +237,16 @@ func Test_loader_Do(t *testing.T) { } type fields struct { eg errgroup.Group - client grpc.Client + client igrpc.Client addr string concurrency int + batchSize int dataset string - requests []interface{} progressDuration time.Duration - requestsFunc requestFunc - loaderFunc loaderFunc + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service operation config.Operation } type want struct { @@ -270,11 +280,13 @@ func Test_loader_Do(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -295,11 +307,13 @@ func Test_loader_Do(t *testing.T) { client: nil, addr: "", concurrency: 0, + batchSize: 0, dataset: "", - requests: nil, progressDuration: nil, - requestsFunc: nil, loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, operation: nil, }, want: want{}, @@ -311,7 +325,7 @@ func Test_loader_Do(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -326,11 +340,13 @@ func Test_loader_Do(t *testing.T) { client: test.fields.client, addr: test.fields.addr, concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, dataset: test.fields.dataset, - requests: test.fields.requests, progressDuration: test.fields.progressDuration, - requestsFunc: test.fields.requestsFunc, loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, operation: test.fields.operation, } @@ -342,3 +358,137 @@ func Test_loader_Do(t *testing.T) { }) } } + +func Test_loader_do(t *testing.T) { + type args struct { + ctx context.Context + f func(interface{}, error) + notify func(context.Context, error) + } + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, error) error + beforeFunc func(args) + afterFunc func(args) + } + defaultCheckFunc := func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + args: args { + ctx: nil, + f: nil, + notify: nil, + }, + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + ctx: nil, + f: nil, + notify: nil, + }, + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } + + err := l.do(test.args.ctx, test.args.f, test.args.notify) + if err := test.checkFunc(test.want, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} diff --git a/pkg/tools/cli/loadtest/service/search.go b/pkg/tools/cli/loadtest/service/search.go index e4f129b3fc..b23bc5c7f3 100644 --- a/pkg/tools/cli/loadtest/service/search.go +++ b/pkg/tools/cli/loadtest/service/search.go @@ -17,26 +17,57 @@ package service import ( "context" + "sync/atomic" + "github.com/vdaas/vald/apis/grpc/agent/core" "github.com/vdaas/vald/apis/grpc/gateway/vald" "github.com/vdaas/vald/apis/grpc/payload" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" ) -func newSearch() (requestFunc, loaderFunc) { - return func(dataset assets.Dataset) ([]interface{}, error) { - vectors := dataset.Query() - requests := make([]interface{}, len(vectors)) - for j, v := range vectors { - requests[j] = &payload.Search_Request{ - Vector: v, - } +func searchRequestProvider(dataset assets.Dataset) (func() interface{}, int, error) { + v := dataset.Query() + size := len(v) + idx := int32(-1) + return func() (ret interface{}) { + if i := int(atomic.AddInt32(&idx, 1)); i < size { + ret = &payload.Search_Request{ + Vector: v[i], } - return requests, nil - }, - func(ctx context.Context, c vald.ValdClient, i interface{}, copts ...grpc.CallOption) error { - _, err := c.Search(ctx, i.(*payload.Search_Request), copts...) - return err } + return ret + }, size, nil +} + +func (l *loader) newSearch() (loadFunc, error) { + switch l.service { + case config.Agent: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return core.NewAgentClient(conn).Search(ctx, i.(*payload.Search_Request), copts...) + }, nil + case config.Gateway: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewValdClient(conn).Search(ctx, i.(*payload.Search_Request), copts...) + }, nil + default: + return nil, errors.Errorf("undefined service: %s", l.service.String()) + } +} + +func (l *loader) newStreamSearch() (loadFunc, error) { + switch l.service { + case config.Agent: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return core.NewAgentClient(conn).StreamSearch(ctx, copts...) + }, nil + case config.Gateway: + return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) { + return vald.NewValdClient(conn).StreamSearch(ctx, copts...) + }, nil + default: + return nil, errors.Errorf("undefined service: %s", l.service.String()) + } } diff --git a/pkg/tools/cli/loadtest/service/search_test.go b/pkg/tools/cli/loadtest/service/search_test.go index 5504c779c2..8452381a99 100644 --- a/pkg/tools/cli/loadtest/service/search_test.go +++ b/pkg/tools/cli/loadtest/service/search_test.go @@ -18,24 +18,37 @@ package service import ( "reflect" "testing" + "time" + "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/assets" + "github.com/vdaas/vald/pkg/tools/cli/loadtest/config" "go.uber.org/goleak" ) -func Test_newSearch(t *testing.T) { +func Test_searchRequestProvider(t *testing.T) { + type args struct { + dataset assets.Dataset + } type want struct { - want requestFunc - want1 loaderFunc + want func() interface{} + want1 int + err error } type test struct { name string + args args want want - checkFunc func(want, requestFunc, loaderFunc) error - beforeFunc func() - afterFunc func() + checkFunc func(want, func() interface{}, int, error) error + beforeFunc func(args) + afterFunc func(args) } - defaultCheckFunc := func(w want, got requestFunc, got1 loaderFunc) error { + defaultCheckFunc := func(w want, got func() interface{}, got1 int, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } if !reflect.DeepEqual(got, w.want) { return errors.Errorf("got = %v, want %v", got, w.want) } @@ -49,6 +62,106 @@ func Test_newSearch(t *testing.T) { /* { name: "test_case_1", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + args: args { + dataset: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(test.args) + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + + got, got1, err := searchRequestProvider(test.args.dataset) + if err := test.checkFunc(test.want, got, got1, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newSearch(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + want loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, }, @@ -59,6 +172,20 @@ func Test_newSearch(t *testing.T) { func() test { return test { name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, want: want{}, checkFunc: defaultCheckFunc, } @@ -68,7 +195,7 @@ func Test_newSearch(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -78,9 +205,145 @@ func Test_newSearch(t *testing.T) { if test.checkFunc == nil { test.checkFunc = defaultCheckFunc } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } + + got, err := l.newSearch() + if err := test.checkFunc(test.want, got, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + +func Test_loader_newStreamSearch(t *testing.T) { + type fields struct { + eg errgroup.Group + client igrpc.Client + addr string + concurrency int + batchSize int + dataset string + progressDuration time.Duration + loaderFunc loadFunc + dataProvider func() interface{} + dataSize int + service config.Service + operation config.Operation + } + type want struct { + want loadFunc + err error + } + type test struct { + name string + fields fields + want want + checkFunc func(want, loadFunc, error) error + beforeFunc func() + afterFunc func() + } + defaultCheckFunc := func(w want, got loadFunc, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got error = %v, want %v", err, w.err) + } + if !reflect.DeepEqual(got, w.want) { + return errors.Errorf("got = %v, want %v", got, w.want) + } + return nil + } + tests := []test{ + // TODO test cases + /* + { + name: "test_case_1", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + }, + */ + + // TODO test cases + /* + func() test { + return test { + name: "test_case_2", + fields: fields { + eg: nil, + client: nil, + addr: "", + concurrency: 0, + batchSize: 0, + dataset: "", + progressDuration: nil, + loaderFunc: nil, + dataProvider: nil, + dataSize: 0, + service: nil, + operation: nil, + }, + want: want{}, + checkFunc: defaultCheckFunc, + } + }(), + */ + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt) + if test.beforeFunc != nil { + test.beforeFunc() + } + if test.afterFunc != nil { + defer test.afterFunc() + } + if test.checkFunc == nil { + test.checkFunc = defaultCheckFunc + } + l := &loader{ + eg: test.fields.eg, + client: test.fields.client, + addr: test.fields.addr, + concurrency: test.fields.concurrency, + batchSize: test.fields.batchSize, + dataset: test.fields.dataset, + progressDuration: test.fields.progressDuration, + loaderFunc: test.fields.loaderFunc, + dataProvider: test.fields.dataProvider, + dataSize: test.fields.dataSize, + service: test.fields.service, + operation: test.fields.operation, + } - got, got1 := newSearch() - if err := test.checkFunc(test.want, got, got1); err != nil { + got, err := l.newStreamSearch() + if err := test.checkFunc(test.want, got, err); err != nil { tt.Errorf("error = %v", err) } diff --git a/pkg/tools/cli/loadtest/usecase/load.go b/pkg/tools/cli/loadtest/usecase/load.go index 7f78fb3d25..958a0b933d 100644 --- a/pkg/tools/cli/loadtest/usecase/load.go +++ b/pkg/tools/cli/loadtest/usecase/load.go @@ -49,12 +49,14 @@ func New(cfg *config.Data) (r runner.Runner, err error) { run.client = grpc.New(clientOpts...) run.loader, err = service.NewLoader( - service.WithOperation(cfg.Method), + service.WithOperation(cfg.Operation), service.WithAddr(cfg.Addr), + service.WithBatchSize(cfg.BatchSize), service.WithDataset(cfg.Dataset), service.WithClient(run.client), service.WithConcurrency(cfg.Concurrency), service.WithProgressDuration(cfg.ProgressDuration), + service.WithService(cfg.Service), ) if err != nil { return nil, err @@ -70,11 +72,16 @@ func (r *run) PreStart(ctx context.Context) (err error) { // Start runs load test and returns error if occurred. func (r *run) Start(ctx context.Context) (<-chan error, error) { - rech, err := r.client.StartConnectionMonitor(ctx) - if err != nil { - return nil, err - } + // TODO: related to #557 + /* + rech, err := r.client.StartConnectionMonitor(ctx) + if err != nil { + return nil, err + } + */ + lech := r.loader.Do(ctx) + ech := make(chan error, 1000) // TODO: fix magic number r.eg.Go(safety.RecoverFunc(func() (err error) { defer close(ech) @@ -87,7 +94,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { } } err = ctx.Err() - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { errs = errors.Wrap(errs, err.Error()) } return errs @@ -96,7 +103,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { select { case <-ctx.Done(): return finalize() - case err = <-rech: + //case err = <-rech: // TODO: related to #557 case err = <-lech: } if err != nil {