From e887e9479b6ef41f31b0e496d1226f6b9668670f Mon Sep 17 00:00:00 2001 From: kpango Date: Mon, 12 Oct 2020 18:29:13 +0900 Subject: [PATCH 1/4] [patch] change gateway vald's mutex lock Signed-off-by: kpango --- pkg/gateway/vald/handler/grpc/handler.go | 73 ++++++++++++------------ 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/pkg/gateway/vald/handler/grpc/handler.go b/pkg/gateway/vald/handler/grpc/handler.go index 1b0dee760e..b2f4f8bee8 100644 --- a/pkg/gateway/vald/handler/grpc/handler.go +++ b/pkg/gateway/vald/handler/grpc/handler.go @@ -147,9 +147,9 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, eg.Go(safety.RecoverFunc(func() error { defer cancel() - // cl := new(checkList) - visited := make(map[string]bool, len(res.Results)) - mu := sync.RWMutex{} + // visited := make(map[string]bool, len(res.Results)) + // mu := sync.RWMutex{} + visited := new(sync.Map) return s.gateway.BroadCast(ectx, func(ctx context.Context, target string, ac agent.AgentClient, copts ...grpc.CallOption) error { r, err := f(ctx, ac, copts...) if err != nil { @@ -161,22 +161,21 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, return nil } id := dist.GetId() - mu.Lock() - if !visited[id] { - visited[id] = true - mu.Unlock() + // mu.RLock() + // already := visited[id] + // mu.RUnlock() + _, already := visited.LoadOrStore(id, struct{}{}) + if !already { + // mu.Lock() + // visited[id] = true + // mu.Unlock() dch <- dist - } else { - mu.Unlock() } - // if !cl.Exists(id) { - // dch <- dist - // cl.Check(id) - // } } return nil }) })) + // vl := make(map[string]struct{}, cap(res.GetResults())) for { select { case <-ectx.Done(): @@ -217,44 +216,46 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, } return res, nil case dist := <-dch: - if len(res.GetResults()) >= num { + pos := len(res.GetResults()) + if pos >= num { if dist.GetDistance() < math.Float32frombits(atomic.LoadUint32(&maxDist)) { atomic.StoreUint32(&maxDist, math.Float32bits(dist.GetDistance())) } else { continue } } - switch len(res.GetResults()) { + // if _, ok := vl[dist.GetId()]; ok { + // continue + // } + // vl[dist.GetId()] = struct{}{} + switch pos { case 0: res.Results = append(res.Results, dist) - continue case 1: if res.GetResults()[0].GetDistance() <= dist.GetDistance() { res.Results = append(res.Results, dist) } else { res.Results = append([]*payload.Object_Distance{dist}, res.Results[0]) } - continue - } - - pos := len(res.GetResults()) - for idx := pos; idx >= 1; idx-- { - if res.GetResults()[idx-1].GetDistance() <= dist.GetDistance() { - pos = idx - 1 - break + default: + for idx := pos; idx >= 1; idx-- { + if res.GetResults()[idx-1].GetDistance() <= dist.GetDistance() { + pos = idx - 1 + break + } + } + switch { + case pos == len(res.GetResults()): + res.Results = append([]*payload.Object_Distance{dist}, res.Results...) + case pos == len(res.GetResults())-1: + res.Results = append(res.GetResults(), dist) + case pos >= 0: + res.Results = append(res.GetResults()[:pos+1], res.GetResults()[pos:]...) + res.Results[pos+1] = dist + } + if len(res.GetResults()) > num && num != 0 { + res.Results = res.GetResults()[:num] } - } - switch { - case pos == len(res.GetResults()): - res.Results = append([]*payload.Object_Distance{dist}, res.Results...) - case pos == len(res.GetResults())-1: - res.Results = append(res.GetResults(), dist) - case pos >= 0: - res.Results = append(res.GetResults()[:pos+1], res.GetResults()[pos:]...) - res.Results[pos+1] = dist - } - if len(res.GetResults()) > num && num != 0 { - res.Results = res.GetResults()[:num] } } } From f9373fbe59949d94d44581ad8ea25e4a876e3b33 Mon Sep 17 00:00:00 2001 From: kpango Date: Tue, 13 Oct 2020 10:32:09 +0900 Subject: [PATCH 2/4] fix Signed-off-by: kpango --- pkg/gateway/vald/handler/grpc/handler.go | 41 +++++++++--------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/pkg/gateway/vald/handler/grpc/handler.go b/pkg/gateway/vald/handler/grpc/handler.go index b2f4f8bee8..f82b70b1c6 100644 --- a/pkg/gateway/vald/handler/grpc/handler.go +++ b/pkg/gateway/vald/handler/grpc/handler.go @@ -130,7 +130,8 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, span.End() } }() - maxDist := uint32(math.MaxUint32) + var maxDist uint32 + atomic.StoreUint32(&maxDist, math.Float32bits(math.MaxFloat32)) num := int(cfg.GetNum()) res = new(payload.Search_Response) res.Results = make([]*payload.Object_Distance, 0, s.gateway.GetAgentCount(ctx)*num) @@ -147,8 +148,6 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, eg.Go(safety.RecoverFunc(func() error { defer cancel() - // visited := make(map[string]bool, len(res.Results)) - // mu := sync.RWMutex{} visited := new(sync.Map) return s.gateway.BroadCast(ectx, func(ctx context.Context, target string, ac agent.AgentClient, copts ...grpc.CallOption) error { r, err := f(ctx, ac, copts...) @@ -157,25 +156,19 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, return nil } for _, dist := range r.GetResults() { - if dist.GetDistance() > math.Float32frombits(atomic.LoadUint32(&maxDist)) { + if dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) { return nil } - id := dist.GetId() - // mu.RLock() - // already := visited[id] - // mu.RUnlock() - _, already := visited.LoadOrStore(id, struct{}{}) - if !already { - // mu.Lock() - // visited[id] = true - // mu.Unlock() + if dist == nil{ + continue + } + if _, already := visited.LoadOrStore(dist.GetId(), struct{}{}); !already { dch <- dist } } return nil }) })) - // vl := make(map[string]struct{}, cap(res.GetResults())) for { select { case <-ectx.Done(): @@ -217,17 +210,9 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, return res, nil case dist := <-dch: pos := len(res.GetResults()) - if pos >= num { - if dist.GetDistance() < math.Float32frombits(atomic.LoadUint32(&maxDist)) { - atomic.StoreUint32(&maxDist, math.Float32bits(dist.GetDistance())) - } else { - continue - } + if pos >= num && dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) { + continue } - // if _, ok := vl[dist.GetId()]; ok { - // continue - // } - // vl[dist.GetId()] = struct{}{} switch pos { case 0: res.Results = append(res.Results, dist) @@ -253,8 +238,14 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, res.Results = append(res.GetResults()[:pos+1], res.GetResults()[pos:]...) res.Results[pos+1] = dist } - if len(res.GetResults()) > num && num != 0 { + pos = len(res.GetResults()) + if pos > num && num != 0 { res.Results = res.GetResults()[:num] + pos = len(res.GetResults()) + } + if distEnd := res.GetResults()[pos].GetDistance(); pos >= num && + distEnd < math.Float32frombits(atomic.LoadUint32(&maxDist)) { + atomic.StoreUint32(&maxDist, math.Float32bits(distEnd)) } } } From 0cc61130527fd5bf662bb701fdd506a78f6c1fe3 Mon Sep 17 00:00:00 2001 From: kpango Date: Tue, 13 Oct 2020 11:18:07 +0900 Subject: [PATCH 3/4] fix Signed-off-by: kpango --- pkg/gateway/vald/handler/grpc/handler.go | 34 +++++++++++++----------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/gateway/vald/handler/grpc/handler.go b/pkg/gateway/vald/handler/grpc/handler.go index f82b70b1c6..0456204009 100644 --- a/pkg/gateway/vald/handler/grpc/handler.go +++ b/pkg/gateway/vald/handler/grpc/handler.go @@ -159,7 +159,7 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, if dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) { return nil } - if dist == nil{ + if dist == nil { continue } if _, already := visited.LoadOrStore(dist.GetId(), struct{}{}); !already { @@ -180,8 +180,8 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, if len(res.GetResults()) > num && num != 0 { res.Results = res.Results[:num] } - uuids := make([]string, 0, len(res.Results)) - for _, r := range res.Results { + uuids := make([]string, 0, len(res.GetResults())) + for _, r := range res.GetResults() { uuids = append(uuids, r.GetId()) } if s.metadata != nil { @@ -209,11 +209,11 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, } return res, nil case dist := <-dch: - pos := len(res.GetResults()) - if pos >= num && dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) { + rl := len(res.GetResults()) + if rl >= num && dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) { continue } - switch pos { + switch rl { case 0: res.Results = append(res.Results, dist) case 1: @@ -223,12 +223,14 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, res.Results = append([]*payload.Object_Distance{dist}, res.Results[0]) } default: - for idx := pos; idx >= 1; idx-- { + var pos int + for idx := rl; idx >= 1; idx-- { if res.GetResults()[idx-1].GetDistance() <= dist.GetDistance() { pos = idx - 1 break } } + switch { case pos == len(res.GetResults()): res.Results = append([]*payload.Object_Distance{dist}, res.Results...) @@ -238,15 +240,15 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, res.Results = append(res.GetResults()[:pos+1], res.GetResults()[pos:]...) res.Results[pos+1] = dist } - pos = len(res.GetResults()) - if pos > num && num != 0 { - res.Results = res.GetResults()[:num] - pos = len(res.GetResults()) - } - if distEnd := res.GetResults()[pos].GetDistance(); pos >= num && - distEnd < math.Float32frombits(atomic.LoadUint32(&maxDist)) { - atomic.StoreUint32(&maxDist, math.Float32bits(distEnd)) - } + } + rl = len(res.GetResults()) + if rl > num && num != 0 { + res.Results = res.GetResults()[:num] + rl = len(res.GetResults()) + } + if distEnd := res.GetResults()[rl].GetDistance(); rl >= num && + distEnd < math.Float32frombits(atomic.LoadUint32(&maxDist)) { + atomic.StoreUint32(&maxDist, math.Float32bits(distEnd)) } } } From 44d4bb3bea714f39e22a3f086f080d1a62a69889 Mon Sep 17 00:00:00 2001 From: Yusuke Kato Date: Tue, 13 Oct 2020 11:22:46 +0900 Subject: [PATCH 4/4] Update pkg/gateway/vald/handler/grpc/handler.go Co-authored-by: Rintaro Okamura --- pkg/gateway/vald/handler/grpc/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/vald/handler/grpc/handler.go b/pkg/gateway/vald/handler/grpc/handler.go index 0456204009..b348070911 100644 --- a/pkg/gateway/vald/handler/grpc/handler.go +++ b/pkg/gateway/vald/handler/grpc/handler.go @@ -209,7 +209,7 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, } return res, nil case dist := <-dch: - rl := len(res.GetResults()) + rl := len(res.GetResults()) // result length if rl >= num && dist.GetDistance() >= math.Float32frombits(atomic.LoadUint32(&maxDist)) { continue }