Skip to content

Commit

Permalink
add insert vq removal logic when delete vq added
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Feb 20, 2021
1 parent 2d94bcb commit 3e44b48
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/core/ngt/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ ngt:
default_epsilon: 0.01
default_pool_size: 100
default_radius: -1
dimension: 6
dimension: 100
distance_type: l2
enable_in_memory_mode: true
enable_proactive_gc: true
Expand Down
32 changes: 30 additions & 2 deletions pkg/agent/core/ngt/service/vqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type vqueue struct {
dmu sync.Mutex
eg errgroup.Group
finalizing atomic.Value
closed atomic.Value
}

type index struct {
Expand All @@ -76,12 +77,15 @@ func New(eg errgroup.Group) Queue {
eg: eg,
}
vq.finalizing.Store(false)
vq.closed.Store(true)
return vq
}

func (v *vqueue) Start(ctx context.Context) (<-chan error, error) {
ech := make(chan error, 1)
v.eg.Go(safety.RecoverFunc(func() (err error) {
v.closed.Store(false)
defer v.closed.Store(true)
defer close(ech)
for {
select {
Expand All @@ -108,7 +112,8 @@ func (v *vqueue) Start(ctx context.Context) (<-chan error, error) {
}

func (v *vqueue) PushInsert(uuid string, vector []float32, date int64) error {
if v.finalizing.Load().(bool) {
// we have to check this instance's channel bypass daemon is finalizing or not, if in finalizing process we should not send new index to channel
if v.finalizing.Load().(bool) || v.closed.Load().(bool) {
return errors.ErrVQueueFinalizing
}
if date == 0 {
Expand All @@ -124,7 +129,8 @@ func (v *vqueue) PushInsert(uuid string, vector []float32, date int64) error {
}

func (v *vqueue) PushDelete(uuid string, date int64) error {
if v.finalizing.Load().(bool) {
// we have to check this instance's channel bypass daemon is finalizing or not, if in finalizing process we should not send new index to channel
if v.finalizing.Load().(bool) || v.closed.Load().(bool) {
return errors.ErrVQueueFinalizing
}
if date == 0 {
Expand Down Expand Up @@ -201,6 +207,28 @@ func (v *vqueue) addDelete(d key) {
v.dmu.Lock()
v.udk = append(v.udk, d)
v.dmu.Unlock()

// we should check insert vqueue if insert vqueue exists and delete operation date is newer than insert operation date then we should remove insert vqueue's data.
v.imu.Lock()
_, ok := v.uiil[d.uuid]
if ok {
dl := make([]int, 0, len(v.uii))
for i, idx := range v.uii {
// check same uuid & operation date
// if date is equal, it may update operation we shouldn't remove at that time
if idx.uuid == d.uuid && d.date > idx.date {
dl = append(dl, i)
}
}
sort.Sort(sort.Reverse(sort.IntSlice(dl)))
for _, i := range dl {
// remove unnecessary insert vector queue data
v.uii = append(v.uii[:i], v.uii[i+1:]...)
}
// remove from existing map
delete(v.uiil, d.uuid)
}
v.imu.Unlock()
}

func (v *vqueue) popInsert() (i index) {
Expand Down

0 comments on commit 3e44b48

Please sign in to comment.