diff --git a/internal/core/algorithm/ngt/ngt.go b/internal/core/algorithm/ngt/ngt.go index 51454a3c544..9ffb8a153c1 100644 --- a/internal/core/algorithm/ngt/ngt.go +++ b/internal/core/algorithm/ngt/ngt.go @@ -702,14 +702,21 @@ func (n *ngt) SaveIndexWithPath(idxPath string) error { // Remove removes from NGT index. func (n *ngt) Remove(id uint) error { + log.Debugf("[CHECK20240220]\tcore.Remove started\t oid: %d,\t", id) ne := n.GetErrorBuffer() + log.Debugf("[CHECK20240220]\tcore.Remove get ebuf\toid: %d,\t", id) + log.Debugf("[CHECK20240220]\tcore.Remove pre-lock\toid: %d,\t", id) n.lock(true) + log.Debugf("[CHECK20240220]\tcore.Remove locked\toid: %d,\t", id) ret := C.ngt_remove_index(n.index, C.ObjectID(id), ne.err) + log.Debugf("[CHECK20240220]\tcore.Remove pre-unlock\toid: %d,\t", id) n.unlock(true) + log.Debugf("[CHECK20240220]\tcore.Remove unlocked\toid: %d,\t", id) if ret == ErrorCode { return n.newGoError(ne) } n.PutErrorBuffer(ne) + log.Debugf("[CHECK20240220]\tcore.Remove return ebuf\toid: %d,\t", id) n.cnt.Add(^uint64(0)) @@ -730,12 +737,18 @@ func (n *ngt) BulkRemove(ids ...uint) (errs error) { // GetVector returns vector stored in NGT index. func (n *ngt) GetVector(id uint) (ret []float32, err error) { dimension := int(n.dimension) + log.Debugf("[CHECK20240220]\tcore.GetVector started\t oid: %d,\t", id) ne := n.GetErrorBuffer() + log.Debugf("[CHECK20240220]\tcore.GetVector get ebuf\toid: %d,\t", id) switch n.objectType { case Float: + log.Debugf("[CHECK20240220]\tcore.GetVector pre-lock\toid: %d,\t", id) n.rLock(false) + log.Debugf("[CHECK20240220]\tcore.GetVector locked\toid: %d,\t", id) results := C.ngt_get_object_as_float(n.ospace, C.ObjectID(id), ne.err) + log.Debugf("[CHECK20240220]\tcore.GetVector pre-unlock\toid: %d,\t", id) n.rUnlock(false) + log.Debugf("[CHECK20240220]\tcore.GetVector unlocked\toid: %d,\t", id) if results == nil { return nil, n.newGoError(ne) } @@ -768,6 +781,7 @@ func (n *ngt) GetVector(id uint) (ret []float32, err error) { return nil, errors.ErrUnsupportedObjectType } n.PutErrorBuffer(ne) + log.Debugf("[CHECK20240220]\tcore.GetVector return ebuf\toid: %d,\t", id) return ret, nil } diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 4429d4ab7bc..ee1588cf16f 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1274,12 +1274,15 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } func (n *ngt) removeInvalidIndex(ctx context.Context) { + log.Debug("[CHECK20240220]\tremoveInvalidIndex started") if n.kvs.Len() == 0 { + log.Debug("[CHECK20240220]\tremoveInvalidIndex kvs Len is empty") return } var dcnt uint32 n.kvs.Range(ctx, func(uuid string, oid uint32, _ int64) bool { vec, err := n.core.GetVector(uint(oid)) + log.Debugf("[CHECK20240220]\tuuid: %s,\toid: %d,\terr: %v", uuid, oid, err) if err != nil || vec == nil || len(vec) != n.dim { log.Debugf("invalid index detected err: %v\tuuid: %s\toid: %d will remove", err, uuid, oid) n.kvs.Delete(uuid) @@ -1295,6 +1298,7 @@ func (n *ngt) removeInvalidIndex(ctx context.Context) { return true }) if atomic.LoadUint32(&dcnt) <= 0 { + log.Debug("[CHECK20240220]\tremoveInvalidIndex deleted count is empty") return } var poolSize uint32 @@ -1303,6 +1307,8 @@ func (n *ngt) removeInvalidIndex(ctx context.Context) { } else { poolSize = atomic.LoadUint32(&dcnt) } + log.Debug("[CHECK20240220]\tremoveInvalidIndex before cimu.Lock") + defer log.Debug("[CHECK20240220]\tremoveInvalidIndex after cimu.Lock") n.cimu.Lock() defer n.cimu.Unlock() n.indexing.Store(true) diff --git a/pkg/agent/internal/kvs/kvs.go b/pkg/agent/internal/kvs/kvs.go index f5dedb13dc5..1b7f0bc2211 100644 --- a/pkg/agent/internal/kvs/kvs.go +++ b/pkg/agent/internal/kvs/kvs.go @@ -20,6 +20,7 @@ import ( "context" "sync/atomic" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync" "github.com/vdaas/vald/internal/sync/errgroup" @@ -154,21 +155,36 @@ func (b *bidi) Range(ctx context.Context, f func(string, uint32, int64) bool) { var wg sync.WaitGroup for i := range b.uo { idx := i + log.Debugf("[CHECK20240220]\tkvs.Range In-Loop idx: %d", idx) wg.Add(1) + log.Debugf("[CHECK20240220]\tkvs.Range wg.Add idx: %d", idx) b.eg.Go(safety.RecoverFunc(func() (err error) { + log.Debugf("[CHECK20240220]\tkvs.Range inner-function idx: %d", idx) + defer log.Debugf("[CHECK20240220]\tkvs.Range inner-function defer wg.Done idx: %d", idx) + defer wg.Done() b.uo[idx].Range(func(uuid string, val ValueStructUo) bool { + log.Debugf("[CHECK20240220]\tkvs.Range.uo[%d].Range", idx) select { case <-ctx.Done(): + log.Debugf("[CHECK20240220]\tkvs.Range.uo[%d].Range context Done", idx) + err = ctx.Err() return false default: + log.Debugf("[CHECK20240220]\tkvs.Range.uo[%d].Range exec Function", idx) return f(uuid, val.value, val.timestamp) } }) - wg.Done() + if err != nil && + (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + log.Debugf("[CHECK20240220]\tkvs.Range return context error idx: %d", idx) + return err + } + log.Debugf("[CHECK20240220]\tkvs.Range return no error idx: %d", idx) return nil })) } wg.Wait() + log.Debug("[CHECK20240220]\tkvs.Range finished") } // Len returns the length of the cache that is set in the bidi.