Skip to content

Commit

Permalink
temporally implement two versions of correct function
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Aug 17, 2023
1 parent ed13624 commit 568279f
Showing 1 changed file with 96 additions and 1 deletion.
97 changes: 96 additions & 1 deletion pkg/index/job/correction/service/corrector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/vdaas/vald/internal/slices"
valdsync "github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/pkg/index/job/correction/config"
stdeg "golang.org/x/sync/errgroup"
)

type Corrector interface {
Expand Down Expand Up @@ -107,10 +108,104 @@ func (c *correct) Start(ctx context.Context) (<-chan error, error) {
}

func (c *correct) correct(ctx context.Context, addrs []string) (err error) {
if err := c.discoverer.GetClient().OrderedRange(ctx, addrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
vc := vald.NewValdClient(conn)
stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{})
if err != nil {
return err
}

seg, ctx := stdeg.WithContext(ctx)
seg.SetLimit(100) // FIXME: server settingsをそのまま流用で良いのか?

finalize := func() error {
err = seg.Wait()
if err != nil {
log.Errorf("err group returned error: %v", err)
return err
}
log.Infof("correction finished for agent %s", addr)
return nil
}
defer finalize()

streamEnd := make(chan struct{})
var once sync.Once
var mu sync.Mutex
// これをさらにerrgroupで囲みたくなるが、さすがに頭がおかしくなりそう
// 事前にRecvすべき件数はわかるのだからその回数だけfor文を回すようにする方がいいか
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-streamEnd:
return nil
default:
// TODO: when vald internal errgroup is changed to block when eg limitation is reached,
// switch to vald version of errgroup.
seg.Go(func() error {
mu.Lock()
// As long as we don't stream.Recv() from the stream, we do not consume the memory of the message.
// So by limiting the number of this errgroup.Go instances, we can limit the memory usage
// https://github.com/grpc/grpc-go/blob/33f9fa2e6e5bcf4cf8fe45133e23779ae6e43f6c/rpc_util.go#L795
res, err := stream.Recv()
mu.Unlock()

if errors.Is(err, io.EOF) {
log.Debugf("StreamListObject stream finished for agent %s", addr)
once.Do(func() {
close(streamEnd)
})
return nil
}
if err != nil {
log.Errorf("StreamListObject stream finished unexpectedly: %v", err)
return err
}

if res.GetVector() == nil {
st := res.GetStatus()
log.Error(st.GetCode(), st.GetMessage(), st.GetDetails())
// continue
return nil
}

log.Debugf("received object in StreamListObject: agent(%s), id(%s), timestamp(%v)", addr, res.GetVector().GetId(), res.GetVector().GetTimestamp())
if err := c.checkConsistency(
ctx,
&vectorReplica{
addr: addr,
vec: res.GetVector(),
},
addrs,
); err != nil {
// TODO: valdとstdでerrorの処理が違うので注意
// (valdはerrが着信するまでにスタートしていた処理は行われる)
// (stdはerrが着信すると他は全て止まる)
log.Errorf("failed to check consistency: %v", err)
return nil // continue other processes
}

return nil
})
}
}
},
); err != nil {
log.Errorf("failed to range over agents(%v): %v", addrs, err)
return err
}

return nil
}

// stream.Recvぶん回しバージョン。メモリ使用率が高くなる可能性があるのでできれば避けたい。パフォーマンスとしては理論上最も良いはず
func (c *correct) correct2(ctx context.Context, addrs []string) (err error) {
if err := c.discoverer.GetClient().OrderedRange(ctx, addrs,
func(ctx context.Context, addr string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
eg, ctx := errgroup.New(ctx)
eg.Limitation(c.cfg.Server.GetGRPCStreamConcurrency())
eg.Limitation(100)

vc := vald.NewValdClient(conn)
stream, err := vc.StreamListObject(ctx, &payload.Object_List_Request{})
Expand Down

0 comments on commit 568279f

Please sign in to comment.