Skip to content

Commit

Permalink
use internal/net/grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
kmrmt committed May 20, 2020
1 parent ff0e454 commit 8c0bce0
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 41 deletions.
2 changes: 1 addition & 1 deletion cmd/cli/loadtest/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ logging:
method: insert
dataset: fashion-mnist
concurrency: 100
address: "gateway.vald.dssk-01.vald.k8s.bp.dssk01.caas.ssk.zcp.yahoo.co.jp:80"
addr: "gateway.vald.dssk-01.vald.k8s.bp.dssk01.caas.ssk.zcp.yahoo.co.jp:80"
client:
addrs: []
health_check_duration: 1s
Expand Down
7 changes: 5 additions & 2 deletions pkg/tools/cli/loadtest/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ type GlobalConfig = config.GlobalConfig
// In K8s environment, this configuration is stored in K8s ConfigMap.
type Data struct {
config.GlobalConfig `json:",inline" yaml:",inline"`
Client *config.GRPCClient `json:"client" yaml:"client"`
Address string `json:"address" yaml:"address"`
Addr string `json:"addr" yaml:"addr"`
Method string `json:"method" yaml:"method"`
Dataset string `json:"dataset" yaml:"dataset"`
Concurrency int `json:"concurrency" yaml:"concurrency"`
Client *config.GRPCClient `json:"client" yaml:"client"`
}

func NewConfig(path string) (cfg *Data, err error) {
Expand All @@ -44,6 +44,9 @@ func NewConfig(path string) (cfg *Data, err error) {
if cfg != nil {
cfg.Bind()
}
if cfg.Client != nil {
cfg.Client.Bind()
}

return cfg, nil
}
42 changes: 23 additions & 19 deletions pkg/tools/cli/loadtest/service/insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@ package insert
import (
"context"
"fmt"
"github.com/vdaas/vald/internal/log"
"reflect"
"sync"

"github.com/vdaas/vald/internal/client"
"github.com/vdaas/vald/apis/grpc/gateway/vald"
"github.com/vdaas/vald/apis/grpc/payload"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/assets"
)

type insert struct {
eg errgroup.Group

w client.Writer
c int
n string

req []*client.ObjectVector
eg errgroup.Group
client grpc.Client
addr string
concurrency int
dataset string
req []*payload.Object_Vector
}

func New(opts ...InsertOption) (i *insert, err error) {
func New(opts ...Option) (i *insert, err error) {
i = new(insert)
for _, opt := range append(defaultInsertOpts, opts...) {
for _, opt := range append(defaultOpts, opts...) {
if err = opt(i); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
Expand All @@ -52,19 +53,19 @@ func New(opts ...InsertOption) (i *insert, err error) {
}

func (i *insert) Prepare(ctx context.Context) error {
fn := assets.Data(i.n)
fn := assets.Data(i.dataset)
if fn == nil {
return fmt.Errorf("dataset load funciton is nil: %s", i.n)
return fmt.Errorf("dataset load funciton is nil: %s", i.dataset)
}
dataset, err := fn()
if err != nil {
return err
}
vectors := dataset.Train()
ids := dataset.IDs()
i.req = make([]*client.ObjectVector, len(vectors))
i.req = make([]*payload.Object_Vector, len(vectors))
for j, v := range vectors {
i.req[j] = &client.ObjectVector{
i.req[j] = &payload.Object_Vector{
Id: ids[j],
Vector: v,
}
Expand All @@ -73,21 +74,24 @@ func (i *insert) Prepare(ctx context.Context) error {
}

func (i *insert) Do(ctx context.Context) <-chan error {
errCh := make(chan error, len(i.req)*10)
errCh := make(chan error, len(i.req))
log.Debugf("insert %d items", len(i.req))
i.eg.Go(safety.RecoverFunc(func() error {
defer close(errCh)
wg := new(sync.WaitGroup)
sem := make(chan struct{}, i.c)
sem := make(chan struct{}, i.concurrency)
for _, req := range i.req {
wg.Add(1)
sem <- struct{}{}
go func(r *client.ObjectVector) {
go func(r *payload.Object_Vector) {
defer wg.Done()
defer func() {
<-sem
}()
err := i.w.Insert(ctx, r)
_, err := i.client.Do(ctx, i.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
_, err := vald.NewValdClient(conn).Insert(ctx, req, copts...)
return nil, err
})
if err != nil {
errCh <- err
}
Expand Down
25 changes: 16 additions & 9 deletions pkg/tools/cli/loadtest/service/insert/insert_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,41 @@
package insert

import (
"github.com/vdaas/vald/internal/client"
"github.com/vdaas/vald/internal/net/grpc"
)

type InsertOption func(*insert) error
type Option func(*insert) error

var (
defaultInsertOpts = []InsertOption{
defaultOpts = []Option{
WithConcurrency(100),
}
)

func WithWriter(w client.Writer) InsertOption {
func WithAddr(a string) Option {
return func(i *insert) error {
i.w = w
i.addr = a
return nil
}
}

func WithConcurrency(c int) InsertOption {
func WithClient(c grpc.Client) Option {
return func(i *insert) error {
i.c = c
i.client = c
return nil
}
}

func WithDataset(n string) InsertOption {
func WithConcurrency(c int) Option {
return func(i *insert) error {
i.n = n
i.concurrency = c
return nil
}
}

func WithDataset(n string) Option {
return func(i *insert) error {
i.dataset = n
return nil
}
}
69 changes: 59 additions & 10 deletions pkg/tools/cli/loadtest/usecase/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ package usecase
import (
"context"
"fmt"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/safety"

"github.com/vdaas/vald/internal/client/gateway/vald/grpc"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/runner"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/config"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/service"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/service/insert"
"github.com/vdaas/vald/pkg/tools/cli/loadtest/service/search"
//"github.com/vdaas/vald/pkg/tools/cli/loadtest/service/search"
)

type run struct {
eg errgroup.Group
cfg *config.Data
loader service.Load
client grpc.Client
}

func New(cfg *config.Data) (r runner.Runner, err error) {
Expand All @@ -45,15 +49,17 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
}

func (r *run) PreStart(ctx context.Context) (err error) {
c, err := grpc.New(ctx, grpc.WithAddr(r.cfg.Address), grpc.WithGRPCClientConfig(r.cfg.Client))
if err != nil {
return fmt.Errorf("grpc connection error")
}
//c, err := grpc.New(ctx, grpc.WithAddr(r.cfg.Address), grpc.WithGRPCClientConfig(r.cfg.Client))
r.client = grpc.New(
grpc.WithAddrs(append([]string{r.cfg.Addr}, r.cfg.Client.Addrs...)...),
grpc.WithInsecure(r.cfg.Client.DialOption.Insecure),
grpc.WithErrGroup(r.eg),
)
switch Atoo(r.cfg.Method) {
case Insert:
r.loader, err = insert.New(insert.WithDataset(r.cfg.Dataset), insert.WithWriter(c))
case Search:
r.loader, err = search.New(search.WithDataset(r.cfg.Dataset), search.WithReader(c))
r.loader, err = insert.New(insert.WithAddr(r.cfg.Addr), insert.WithDataset(r.cfg.Dataset), insert.WithClient(r.client))
//case Search:
// r.loader, err = search.New(search.WithDataset(r.cfg.Dataset), search.WithReader(c))
default:
return fmt.Errorf("unsupported method")
}
Expand All @@ -62,7 +68,50 @@ func (r *run) PreStart(ctx context.Context) (err error) {
}

func (r *run) Start(ctx context.Context) (<-chan error, error) {
return r.loader.Do(ctx), nil
rech, err := r.client.StartConnectionMonitor(ctx)
if err != nil {
return nil, err
}
lech := r.loader.Do(ctx)
ech := make(chan error, 1000)
r.eg.Go(safety.RecoverFunc(func() (err error) {
defer close(ech)
finalize := func() (err error) {
var errs error
if r.client != nil {
err = r.client.Close()
if err != nil {
errs = errors.Wrap(errs, err.Error())
}
}
err = ctx.Err()
if err != nil && err != context.Canceled {
errs = errors.Wrap(errs, err.Error())
}
return errs
}
for {
select {
case <-ctx.Done():
return finalize()
case err = <-rech:
case err = <-lech:
if err != nil {
ech <- err
err = nil
}
}
if err != nil {
log.Error(err)
select {
case <-ctx.Done():
return finalize()
case ech <- err:
}
}
}
}))
return ech, nil
}

func (r *run) PreStop(ctx context.Context) error {
Expand Down

0 comments on commit 8c0bce0

Please sign in to comment.