From 4205f6b1a9f3368d0cb5379c35d785605c8e860c Mon Sep 17 00:00:00 2001 From: Rintaro Okamura Date: Thu, 2 Jul 2020 13:39:40 +0900 Subject: [PATCH] [patch] use Len and InsertVCacheLen method for IndexInfo / add mutex for (Create|Save)Index (#536) * :recycle: use Len() and InsertVCacheLen() method for IndexInfo Signed-off-by: Rintaro Okamura * :recycle: add mutex for create-index & save-index Signed-off-by: Rintaro Okamura * :wrench: pass post_stop_timeout to sidecar observer service Signed-off-by: Rintaro Okamura --- internal/runner/runner.go | 13 ++++++++++--- pkg/agent/core/ngt/handler/grpc/handler.go | 4 ++-- pkg/agent/core/ngt/service/ngt.go | 13 ++++++++++--- pkg/agent/sidecar/usecase/sidecar/sidecar.go | 1 + 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 948344b11c..6ef7f70c5a 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -32,6 +32,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/log/level" "github.com/vdaas/vald/internal/params" + "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/timeutil/location" ver "github.com/vdaas/vald/internal/version" "go.uber.org/automaxprocs/maxprocs" @@ -160,7 +161,9 @@ func Run(ctx context.Context, run Runner, name string) (err error) { } case <-rctx.Done(): log.Info("executing daemon pre-stop function") - err = run.PreStop(ctx) + err = safety.RecoverFunc(func() error { + return run.PreStop(ctx) + })() if err != nil { log.Error(errors.ErrPreStopFunc(name, err)) if _, ok := emap[err.Error()]; !ok { @@ -170,7 +173,9 @@ func Run(ctx context.Context, run Runner, name string) (err error) { } log.Info("executing daemon stop function") - err = run.Stop(ctx) + err = safety.RecoverFunc(func() error { + return run.Stop(ctx) + })() if err != nil { log.Error(errors.ErrStopFunc(name, err)) if _, ok := emap[err.Error()]; !ok { @@ -180,7 +185,9 @@ func Run(ctx context.Context, run Runner, name string) (err error) { } log.Info("executing daemon post-stop function") - err = run.PostStop(ctx) + err = safety.RecoverFunc(func() error { + return run.PostStop(ctx) + })() if err != nil { log.Error(errors.ErrPostStopFunc(name, err)) if _, ok := emap[err.Error()]; !ok { diff --git a/pkg/agent/core/ngt/handler/grpc/handler.go b/pkg/agent/core/ngt/handler/grpc/handler.go index 806ffcc009..dc2895e26b 100644 --- a/pkg/agent/core/ngt/handler/grpc/handler.go +++ b/pkg/agent/core/ngt/handler/grpc/handler.go @@ -421,8 +421,8 @@ func (s *server) IndexInfo(ctx context.Context, _ *payload.Empty) (res *payload. } }() return &payload.Info_Index_Count{ - Stored: uint32(len(s.ngt.UUIDs(ctx))), - Uncommitted: uint32(len(s.ngt.UncommittedUUIDs())), + Stored: uint32(s.ngt.Len()), + Uncommitted: uint32(s.ngt.InsertVCacheLen()), Indexing: s.ngt.IsIndexing(), }, nil } diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index bbf4d81c2d..029e44a397 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -69,6 +69,7 @@ type NGT interface { type ngt struct { alen int indexing atomic.Value + saveMu sync.Mutex // creating or saving index lim time.Duration // auto indexing time limit dur time.Duration // auto indexing check duration sdur time.Duration // auto save index check duration @@ -234,7 +235,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { err = nil select { case <-ctx.Done(): - err = n.CreateAndSaveIndex(ctx, n.dps) + err = n.CreateIndex(ctx, n.dps) if err != nil { ech <- err return errors.Wrap(ctx.Err(), err.Error()) @@ -454,6 +455,9 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } }() + n.saveMu.Lock() + defer n.saveMu.Unlock() + if n.indexing.Load().(bool) { return nil } @@ -558,6 +562,9 @@ func (n *ngt) SaveIndex(ctx context.Context) (err error) { } }() + n.saveMu.Lock() + defer n.saveMu.Unlock() + if len(n.path) != 0 && !n.inMem { eg, ctx := errgroup.New(ctx) eg.Go(safety.RecoverFunc(func() error { @@ -594,7 +601,7 @@ func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err erro }() err = n.CreateIndex(ctx, poolSize) - if err != nil { + if err != nil && err != errors.ErrUncommittedIndexNotFound { return err } return n.SaveIndex(ctx) @@ -667,7 +674,7 @@ func (n *ngt) DeleteVCacheLen() uint64 { func (n *ngt) Close(ctx context.Context) (err error) { if len(n.path) != 0 { - err = n.SaveIndex(ctx) + err = n.CreateAndSaveIndex(ctx, n.dps) } n.core.Close() return diff --git a/pkg/agent/sidecar/usecase/sidecar/sidecar.go b/pkg/agent/sidecar/usecase/sidecar/sidecar.go index dc3c93c933..b133fd1436 100644 --- a/pkg/agent/sidecar/usecase/sidecar/sidecar.go +++ b/pkg/agent/sidecar/usecase/sidecar/sidecar.go @@ -134,6 +134,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { so, err = observer.New( observer.WithErrGroup(eg), observer.WithBackupDuration(cfg.AgentSidecar.AutoBackupDuration), + observer.WithPostStopTimeout(cfg.AgentSidecar.PostStopTimeout), observer.WithDir(cfg.AgentSidecar.WatchDir), observer.WithBlobStorage(bs), )