Skip to content

Commit

Permalink
[patch] use Len and InsertVCacheLen method for IndexInfo / add mutex …
Browse files Browse the repository at this point in the history
…for (Create|Save)Index (#536)

* ♻️ use Len() and InsertVCacheLen() method for IndexInfo

Signed-off-by: Rintaro Okamura <rintaro.okamura@gmail.com>

* ♻️ add mutex for create-index & save-index

Signed-off-by: Rintaro Okamura <rintaro.okamura@gmail.com>

* 🔧 pass post_stop_timeout to sidecar observer service

Signed-off-by: Rintaro Okamura <rintaro.okamura@gmail.com>
  • Loading branch information
rinx committed Jul 2, 2020
1 parent 76908b6 commit 4205f6b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 8 deletions.
13 changes: 10 additions & 3 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/log/level"
"github.com/vdaas/vald/internal/params"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/timeutil/location"
ver "github.com/vdaas/vald/internal/version"
"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -160,7 +161,9 @@ func Run(ctx context.Context, run Runner, name string) (err error) {
}
case <-rctx.Done():
log.Info("executing daemon pre-stop function")
err = run.PreStop(ctx)
err = safety.RecoverFunc(func() error {
return run.PreStop(ctx)
})()
if err != nil {
log.Error(errors.ErrPreStopFunc(name, err))
if _, ok := emap[err.Error()]; !ok {
Expand All @@ -170,7 +173,9 @@ func Run(ctx context.Context, run Runner, name string) (err error) {
}

log.Info("executing daemon stop function")
err = run.Stop(ctx)
err = safety.RecoverFunc(func() error {
return run.Stop(ctx)
})()
if err != nil {
log.Error(errors.ErrStopFunc(name, err))
if _, ok := emap[err.Error()]; !ok {
Expand All @@ -180,7 +185,9 @@ func Run(ctx context.Context, run Runner, name string) (err error) {
}

log.Info("executing daemon post-stop function")
err = run.PostStop(ctx)
err = safety.RecoverFunc(func() error {
return run.PostStop(ctx)
})()
if err != nil {
log.Error(errors.ErrPostStopFunc(name, err))
if _, ok := emap[err.Error()]; !ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/core/ngt/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ func (s *server) IndexInfo(ctx context.Context, _ *payload.Empty) (res *payload.
}
}()
return &payload.Info_Index_Count{
Stored: uint32(len(s.ngt.UUIDs(ctx))),
Uncommitted: uint32(len(s.ngt.UncommittedUUIDs())),
Stored: uint32(s.ngt.Len()),
Uncommitted: uint32(s.ngt.InsertVCacheLen()),
Indexing: s.ngt.IsIndexing(),
}, nil
}
13 changes: 10 additions & 3 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type NGT interface {
type ngt struct {
alen int
indexing atomic.Value
saveMu sync.Mutex // creating or saving index
lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration
Expand Down Expand Up @@ -234,7 +235,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
err = nil
select {
case <-ctx.Done():
err = n.CreateAndSaveIndex(ctx, n.dps)
err = n.CreateIndex(ctx, n.dps)
if err != nil {
ech <- err
return errors.Wrap(ctx.Err(), err.Error())
Expand Down Expand Up @@ -454,6 +455,9 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
}
}()

n.saveMu.Lock()
defer n.saveMu.Unlock()

if n.indexing.Load().(bool) {
return nil
}
Expand Down Expand Up @@ -558,6 +562,9 @@ func (n *ngt) SaveIndex(ctx context.Context) (err error) {
}
}()

n.saveMu.Lock()
defer n.saveMu.Unlock()

if len(n.path) != 0 && !n.inMem {
eg, ctx := errgroup.New(ctx)
eg.Go(safety.RecoverFunc(func() error {
Expand Down Expand Up @@ -594,7 +601,7 @@ func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err erro
}()

err = n.CreateIndex(ctx, poolSize)
if err != nil {
if err != nil && err != errors.ErrUncommittedIndexNotFound {
return err
}
return n.SaveIndex(ctx)
Expand Down Expand Up @@ -667,7 +674,7 @@ func (n *ngt) DeleteVCacheLen() uint64 {

func (n *ngt) Close(ctx context.Context) (err error) {
if len(n.path) != 0 {
err = n.SaveIndex(ctx)
err = n.CreateAndSaveIndex(ctx, n.dps)
}
n.core.Close()
return
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/sidecar/usecase/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) {
so, err = observer.New(
observer.WithErrGroup(eg),
observer.WithBackupDuration(cfg.AgentSidecar.AutoBackupDuration),
observer.WithPostStopTimeout(cfg.AgentSidecar.PostStopTimeout),
observer.WithDir(cfg.AgentSidecar.WatchDir),
observer.WithBlobStorage(bs),
)
Expand Down

0 comments on commit 4205f6b

Please sign in to comment.