Skip to content

Commit

Permalink
use errgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
kmrmt committed May 20, 2020
1 parent 8c0bce0 commit 7d348f5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
3 changes: 2 additions & 1 deletion cmd/cli/loadtest/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ logging:
method: insert
dataset: fashion-mnist
concurrency: 100
addr: "gateway.vald.dssk-01.vald.k8s.bp.dssk01.caas.ssk.zcp.yahoo.co.jp:80"
#addr: "gateway.vald.dssk-01.vald.k8s.bp.dssk01.caas.ssk.zcp.yahoo.co.jp:80"
addr: "localhost:8081"
client:
addrs: []
health_check_duration: 1s
Expand Down
26 changes: 10 additions & 16 deletions pkg/tools/cli/loadtest/service/insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"fmt"
"reflect"
"sync"

"github.com/vdaas/vald/apis/grpc/gateway/vald"
"github.com/vdaas/vald/apis/grpc/payload"
Expand Down Expand Up @@ -62,7 +61,7 @@ func (i *insert) Prepare(ctx context.Context) error {
return err
}
vectors := dataset.Train()
ids := dataset.IDs()
ids := assets.CreateRandomIDs(len(vectors))
i.req = make([]*payload.Object_Vector, len(vectors))
for j, v := range vectors {
i.req[j] = &payload.Object_Vector{
Expand All @@ -78,27 +77,22 @@ func (i *insert) Do(ctx context.Context) <-chan error {
log.Debugf("insert %d items", len(i.req))
i.eg.Go(safety.RecoverFunc(func() error {
defer close(errCh)
wg := new(sync.WaitGroup)
sem := make(chan struct{}, i.concurrency)
eg, egctx := errgroup.New(ctx)
eg.Limitation(i.concurrency)
for _, req := range i.req {
wg.Add(1)
sem <- struct{}{}
go func(r *payload.Object_Vector) {
defer wg.Done()
defer func() {
<-sem
}()
_, err := i.client.Do(ctx, i.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
_, err := vald.NewValdClient(conn).Insert(ctx, req, copts...)
r := req
eg.Go(func() error {
_, err := i.client.Do(egctx, i.addr, func(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) (interface{}, error) {
_, err := vald.NewValdClient(conn).Insert(ctx, r, copts...)
return nil, err
})
if err != nil {
errCh <- err
}
}(req)
return nil
})
}
wg.Wait()
return nil
return eg.Wait()
}))
return errCh
}

0 comments on commit 7d348f5

Please sign in to comment.