Skip to content

Commit

Permalink
resolve kvs already closed before last saving
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Feb 20, 2024
1 parent f080436 commit c049ea2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 22 deletions.
2 changes: 0 additions & 2 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,7 @@ func (n *ngt) Remove(id uint) error {
return n.newGoError(ne)
}
n.PutErrorBuffer(ne)

n.cnt.Add(^uint64(0))

return nil
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,8 +1741,18 @@ func (n *ngt) GetDimensionSize() int {

func (n *ngt) Close(ctx context.Context) (err error) {
defer n.core.Close()

err = n.kvs.Close()
defer func() {
kerr := n.kvs.Close()
if kerr != nil &&
!errors.Is(err, context.Canceled) &&
!errors.Is(err, context.DeadlineExceeded) {
if err != nil {
err = errors.Join(kerr, err)
} else {
err = kerr
}
}
}()
if len(n.path) != 0 {
if n.isReadReplica {
log.Info("skip create and save index operation on close because this is read replica")
Expand Down
32 changes: 14 additions & 18 deletions pkg/agent/internal/kvs/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"sync/atomic"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"

Check failure on line 24 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

"github.com/vdaas/vald/internal/log" imported and not used

Check failure on line 24 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

"github.com/vdaas/vald/internal/log" imported and not used
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down Expand Up @@ -53,7 +55,6 @@ type bidi struct {
l uint64
ou [slen]*sync.Map[uint32, valueStructOu]
uo [slen]*sync.Map[string, ValueStructUo]
eg errgroup.Group
}

const (
Expand All @@ -79,14 +80,6 @@ func New(opts ...Option) BidiMap {
b.uo[i] = new(sync.Map[string, ValueStructUo])
}

if b.eg == nil {
b.eg, _ = errgroup.New(context.Background())
}

if b.concurrency > 0 {
b.eg.SetLimit(b.concurrency)
}

return b
}

Expand Down Expand Up @@ -151,24 +144,30 @@ func (b *bidi) DeleteInverse(val uint32) (key string, ok bool) {

// Range retrieves all set keys and values and calls the callback function f.
func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) {
var wg sync.WaitGroup
eg, ctx = errgroup.New(ctx)

Check failure on line 147 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 147 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
if b.concurrency > 0 {
eg.SetLimit(b.concurrency)

Check failure on line 149 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 149 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
}
for i := range b.uo {
idx := i
wg.Add(1)
b.eg.Go(safety.RecoverFunc(func() (err error) {
eg.Go(safety.RecoverFunc(func() (err error) {

Check failure on line 153 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 153 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
b.uo[idx].Range(func(uuid string, val ValueStructUo) bool {
select {
case <-ctx.Done():
err = ctx.Err()
return false
default:
return f(uuid, val.value, val.timestamp)
}
})
wg.Done()
if err != nil &&
(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
return err
}
return nil
}))
}
wg.Wait()
eg.Wait()

Check failure on line 170 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-sequential

undefined: eg

Check failure on line 170 in pkg/agent/internal/kvs/kvs.go

View workflow job for this annotation

GitHub Actions / grpc-stream

undefined: eg
}

// Len returns the length of the cache that is set in the bidi.
Expand All @@ -180,10 +179,7 @@ func (b *bidi) Len() uint64 {
}

func (b *bidi) Close() error {
if b == nil {
return nil
}
return b.eg.Wait()
return nil
}

func getShardID(key string) (id uint64) {
Expand Down

0 comments on commit c049ea2

Please sign in to comment.