Skip to content

Commit

Permalink
refactor load test
Browse files Browse the repository at this point in the history
  • Loading branch information
kmrmt committed Jul 6, 2020
1 parent a569549 commit 354f33e
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 113 deletions.
11 changes: 7 additions & 4 deletions cmd/tools/cli/loadtest/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ version: v0.0.0
time_zone: JST
logging:
logger: glg
level: debug
format: json
method: insert
level: info
format: raw
service: agent
operation: insert
dataset: fashion-mnist
concurrency: 100
addr: "localhost:8081"
batch_size: 10
progress_duration: 3s
addr: "localhost:8082"
client:
addrs: []
health_check_duration: 1s
Expand Down
8 changes: 4 additions & 4 deletions pkg/tools/cli/loadtest/assets/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ func Load(path string) (train, test, distances [][]float32, neighbors [][]int, d
}()
trainDim, v1, err := loadDataset(f, "train", loadFloat32)
if err != nil {
log.Error("couldn't load train dataset for path %s: %s", path, err)
log.Errorf("couldn't load train dataset for path %s: %s", path, err)
return nil, nil, nil, nil, 0, err
}
train = v1.([][]float32)
dim = trainDim
testDim, v2, err := loadDataset(f, "test", loadFloat32)
if err != nil {
log.Error("couldn't load test dataset for path %s: %s", path, err)
log.Errorf("couldn't load test dataset for path %s: %s", path, err)
return train, nil, nil, nil, dim, err
}
test = v2.([][]float32)
Expand All @@ -108,14 +108,14 @@ func Load(path string) (train, test, distances [][]float32, neighbors [][]int, d
}
distancesDim, v3, err := loadDataset(f, "distances", loadFloat32)
if err != nil {
log.Error("couldn't load distances dataset for path %s: %s", path, err)
log.Errorf("couldn't load distances dataset for path %s: %s", path, err)
return train, test, nil, nil, dim, err
}
distances = v3.([][]float32)

neighborsDim, v4, err := loadDataset(f, "neighbors", loadInt)
if err != nil {
log.Error("couldn't load neighbors dataset for path %s: %s", path, err)
log.Errorf("couldn't load neighbors dataset for path %s: %s", path, err)
return train, test, distances, nil, trainDim, err
}
neighbors = v4.([][]int)
Expand Down
68 changes: 57 additions & 11 deletions pkg/tools/cli/loadtest/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,80 @@ import (
// GlobalConfig is type alias of config.GlobalConfig.
type GlobalConfig = config.GlobalConfig

// Operation is type of implemented load test.
// Operation is operation type of implemented load test.
type Operation uint8

// Operation method.
// Operation method definition.
const (
Unknown Operation = iota
UnknownOperation Operation = iota
Insert
BulkInsert
StreamInsert
Search
StreamSearch
)

// OperationMethod convert string to Operation.
// OperationMethod converts string to Operation.
func OperationMethod(s string) Operation {
switch strings.ToLower(s) {
case "insert":
return Insert
case "bulkinsert":
return BulkInsert
case "streaminsert":
return StreamInsert
case "search":
return Search
case "streamsearch":
return StreamSearch
default:
return Unknown
return UnknownOperation
}
}

// String convert Operation to string.
// String converts Operation to string.
func (o Operation) String() string {
switch o {
case Insert:
return "insert"
return "Insert"
case Search:
return "search"
return "Search"
default:
return "unknown operation"
return "Unknown operation"
}
}

// Service is service type of implemented load test.
type Service uint8

// Service definitions.
const (
UnknownService Service = iota
Agent
Gateway
)

// ServiceMethod converts string to Service.
func ServiceMethod(s string) Service {
switch strings.ToLower(s) {
case "agent":
return Agent
case "gateway":
return Gateway
default:
return UnknownService
}
}

// String converts Service to string.
func (s Service) String() string {
switch s {
case Agent:
return "Agent"
case Gateway:
return "Gateway"
default:
return "Unknown service"
}
}

Expand All @@ -65,9 +108,11 @@ func (o Operation) String() string {
type Data struct {
config.GlobalConfig `json:",inline" yaml:",inline"`
Addr string `json:"addr" yaml:"addr"`
Method string `json:"method" yaml:"method"`
Service string `json:"service" yaml:"service"`
Operation string `json:"operation" yaml:"operation"`
Dataset string `json:"dataset" yaml:"dataset"`
Concurrency int `json:"concurrency" yaml:"concurrency"`
BatchSize int `json:"batch_size" yaml:"batch_size"`
ProgressDuration string `json:"progress_duration" yaml:"progress_duration"`
Client *config.GRPCClient `json:"client" yaml:"client"`
}
Expand All @@ -88,9 +133,10 @@ func NewConfig(path string) (cfg *Data, err error) {
}

cfg.Addr = config.GetActualValue(cfg.Addr)
cfg.Method = config.GetActualValue(cfg.Method)
cfg.Operation = config.GetActualValue(cfg.Operation)
cfg.Dataset = config.GetActualValue(cfg.Dataset)
cfg.ProgressDuration = config.GetActualValue(cfg.ProgressDuration)
cfg.Service = config.GetActualValue(cfg.Service)

return cfg, nil
}
145 changes: 130 additions & 15 deletions pkg/tools/cli/loadtest/service/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,143 @@ package service

import (
"context"
"sync"

"github.com/vdaas/vald/apis/grpc/agent/core"
"github.com/vdaas/vald/apis/grpc/gateway/vald"
"github.com/vdaas/vald/apis/grpc/payload"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/assets"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/config"
)

func newInsert() (requestFunc, loaderFunc) {
return func(dataset assets.Dataset) ([]interface{}, error) {
vectors := dataset.Train()
ids := dataset.IDs()
requests := make([]interface{}, len(vectors))
for j, v := range vectors {
requests[j] = &payload.Object_Vector{
Id: ids[j],
Vector: v,
}
func insertRequestProvider(dataset assets.Dataset, batchSize int) (f func() interface{}, size int, err error) {
switch {
case batchSize == 1:
f, size = objectVectorProvider(dataset)
case batchSize >= 2:
f, size = objectVectorsProvider(dataset, batchSize)
default:
err = errors.New("batch size must be natural number.")
}
if err != nil {
return nil, 0, err
}
return f, size, nil
}

func objectVectorProvider(dataset assets.Dataset) (func() interface{}, int) {
v := dataset.Train()
ids := dataset.IDs()
i := 0
size := len(v)
m := &sync.Mutex{}
return func() (ret interface{}) {
m.Lock()
defer m.Unlock()
if i < size {
ret = &payload.Object_Vector{
Id: ids[i],
Vector: v[i],
}
i++
}
return ret
}, size
}

func objectVectorsProvider(dataset assets.Dataset, n int) (func() interface{}, int) {
provider, s := objectVectorProvider(dataset)
size := s / n
if s % n != 0 {
size = size + 1
}
return func() (ret interface{}) {
v := make([]*payload.Object_Vector, 0, n)
for i := 0; i < n; i++ {
d := provider()
if d == nil {
break
}
return requests, nil
},
func(ctx context.Context, c vald.ValdClient, i interface{}, copts ...grpc.CallOption) error {
_, err := c.Insert(ctx, i.(*payload.Object_Vector), copts...)
return err
v = append(v, d.(*payload.Object_Vector))
}
return &payload.Object_Vectors{
Vectors: v,
}
}, size
}

type inserter interface {
Insert(context.Context, *payload.Object_Vector, ...grpc.CallOption) (*payload.Empty, error)
MultiInsert(context.Context, *payload.Object_Vectors, ...grpc.CallOption) (*payload.Empty, error)
}

func agent(conn *grpc.ClientConn) inserter {
return core.NewAgentClient(conn)
}

func gateway(conn *grpc.ClientConn) inserter {
return vald.NewValdClient(conn)
}

func insert(c func(*grpc.ClientConn) inserter) loadFunc {
return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) {
return c(conn).Insert(ctx, i.(*payload.Object_Vector), copts...)
}
}

func bulkInsert(c func(*grpc.ClientConn) inserter) loadFunc {
return func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) {
return c(conn).MultiInsert(ctx, i.(*payload.Object_Vectors), copts...)
}
}

func (l *loader) newInsert() (f loadFunc, err error) {
switch {
case l.batchSize == 1:
switch l.service {
case config.Agent:
f = insert(agent)
case config.Gateway:
f = insert(gateway)
default:
err = errors.Errorf("undefined service: %s", l.service.String())
}
case l.batchSize >= 2:
switch l.service {
case config.Agent:
f = bulkInsert(agent)
case config.Gateway:
f = bulkInsert(gateway)
default:
err = errors.Errorf("undefined service: %s", l.service.String())
}
default:
err = errors.Errorf("batch size must be natural number.")
}
if err != nil {
return nil, err
}
return f, nil
}

func (l *loader) newStreamInsert() (f loadFunc, err error) {
l.batchSize = 1
switch l.service {
case config.Agent:
f = func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) {
return core.NewAgentClient(conn).StreamInsert(ctx, copts...)
}
case config.Gateway:
f = func(ctx context.Context, conn *grpc.ClientConn, i interface{}, copts ...grpc.CallOption) (interface{}, error) {
return vald.NewValdClient(conn).StreamInsert(ctx, copts...)
}
default:
err = errors.Errorf("undefined service: %s", l.service.String())
}
if err != nil {
return nil, err
}
return f, nil
}
6 changes: 3 additions & 3 deletions pkg/tools/cli/loadtest/service/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
func Test_newInsert(t *testing.T) {
type want struct {
want requestFunc
want1 loaderFunc
want1 loadFunc
}
type test struct {
name string
want want
checkFunc func(want, requestFunc, loaderFunc) error
checkFunc func(want, requestFunc, loadFunc) error
beforeFunc func()
afterFunc func()
}
defaultCheckFunc := func(w want, got requestFunc, got1 loaderFunc) error {
defaultCheckFunc := func(w want, got requestFunc, got1 loadFunc) error {
if !reflect.DeepEqual(got, w.want) {
return errors.Errorf("got = %v, want %v", got, w.want)
}
Expand Down
Loading

0 comments on commit 354f33e

Please sign in to comment.