Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <i.can.feel.gravity@gmail.com>
  • Loading branch information
kpango committed Oct 13, 2020
1 parent e887e94 commit f9373fb
Showing 1 changed file with 16 additions and 25 deletions.
41 changes: 16 additions & 25 deletions pkg/gateway/vald/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config,
span.End()
}
}()
maxDist := uint32(math.MaxUint32)
var maxDist uint32
atomic.StoreUint32(&maxDist, math.Float32bits(math.MaxFloat32))
num := int(cfg.GetNum())
res = new(payload.Search_Response)
res.Results = make([]*payload.Object_Distance, 0, s.gateway.GetAgentCount(ctx)*num)
Expand All @@ -147,8 +148,6 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config,

eg.Go(safety.RecoverFunc(func() error {
defer cancel()
// visited := make(map[string]bool, len(res.Results))
// mu := sync.RWMutex{}
visited := new(sync.Map)
return s.gateway.BroadCast(ectx, func(ctx context.Context, target string, ac agent.AgentClient, copts ...grpc.CallOption) error {
r, err := f(ctx, ac, copts...)
Expand All @@ -157,25 +156,19 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config,
return nil
}
for _, dist := range r.GetResults() {
if dist.GetDistance() > math.Float32frombits(atomic.LoadUint32(&maxDist)) {
if dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) {
return nil
}
id := dist.GetId()
// mu.RLock()
// already := visited[id]
// mu.RUnlock()
_, already := visited.LoadOrStore(id, struct{}{})
if !already {
// mu.Lock()
// visited[id] = true
// mu.Unlock()
if dist == nil{
continue
}
if _, already := visited.LoadOrStore(dist.GetId(), struct{}{}); !already {
dch <- dist
}
}
return nil
})
}))
// vl := make(map[string]struct{}, cap(res.GetResults()))
for {
select {
case <-ectx.Done():
Expand Down Expand Up @@ -217,17 +210,9 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config,
return res, nil
case dist := <-dch:
pos := len(res.GetResults())
if pos >= num {
if dist.GetDistance() < math.Float32frombits(atomic.LoadUint32(&maxDist)) {
atomic.StoreUint32(&maxDist, math.Float32bits(dist.GetDistance()))
} else {
continue
}
if pos >= num && dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) {
continue
}
// if _, ok := vl[dist.GetId()]; ok {
// continue
// }
// vl[dist.GetId()] = struct{}{}
switch pos {
case 0:
res.Results = append(res.Results, dist)
Expand All @@ -253,8 +238,14 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config,
res.Results = append(res.GetResults()[:pos+1], res.GetResults()[pos:]...)
res.Results[pos+1] = dist
}
if len(res.GetResults()) > num && num != 0 {
pos = len(res.GetResults())
if pos > num && num != 0 {
res.Results = res.GetResults()[:num]
pos = len(res.GetResults())
}
if distEnd := res.GetResults()[pos].GetDistance(); pos >= num &&
distEnd < math.Float32frombits(atomic.LoadUint32(&maxDist)) {
atomic.StoreUint32(&maxDist, math.Float32bits(distEnd))
}
}
}
Expand Down

0 comments on commit f9373fb

Please sign in to comment.