From f3f6635e6c2e2e1b0520af2cdc0b31c0a17e0bef Mon Sep 17 00:00:00 2001 From: kpango Date: Sun, 17 Jan 2021 13:44:52 +0900 Subject: [PATCH 1/6] os.free nil pointer failure in ngt cgo due to create index hang up Signed-off-by: kpango --- Makefile | 2 ++ Makefile.d/client.mk | 4 ++-- README.md | 2 +- cmd/agent/core/ngt/sample.yaml | 2 +- internal/core/algorithm/ngt/ngt.go | 20 +++++--------------- internal/net/grpc/stream.go | 5 ++++- internal/runner/runner.go | 18 +++++++++++++++++- pkg/agent/core/ngt/service/ngt.go | 4 ++-- pkg/gateway/lb/usecase/vald.go | 1 + 9 files changed, 35 insertions(+), 23 deletions(-) diff --git a/Makefile b/Makefile index 8216a53bd1..d246440894 100644 --- a/Makefile +++ b/Makefile @@ -149,6 +149,8 @@ PORT ?= 80 NUMBER ?= 10 DIMENSION ?= 6 NUMPANES ?= 4 +MEAN ?= 0.0 +STDDEV ?= 1.0 BODY = "" diff --git a/Makefile.d/client.mk b/Makefile.d/client.mk index d046261ad1..c0c25e6779 100644 --- a/Makefile.d/client.mk +++ b/Makefile.d/client.mk @@ -37,9 +37,9 @@ endif .PHONY: valdcli/xpanes/insert ## insert randomized vectors using valdcli and xpanes valdcli/xpanes/insert: - xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-insert --elapsed-time" $$(seq 1 $(NUMPANES)) + xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-insert --elapsed-time" $$(seq 1 $(NUMPANES)) .PHONY: valdcli/xpanes/search ## search randomized vectors using valdcli and xpanes valdcli/xpanes/search: - xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES)) + xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES)) diff --git a/README.md b/README.md index 1666b528a1..e1ea6cb450 100755 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![License: Apache 2.0](https://img.shields.io/github/license/vdaas/vald.svg?style=flat-square)](https://opensource.org/licenses/Apache-2.0) [![release](https://img.shields.io/github/release/vdaas/vald.svg?style=flat-square)](https://github.com/vdaas/vald/releases/latest) -[![GoDoc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat-square)](https://pkg.go.dev/github.com/vdaas/vald) +[![Go Reference](https://pkg.go.dev/badge/github.com/vdaas/vald.svg)](https://pkg.go.dev/github.com/vdaas/vald) [![Codacy Badge](https://img.shields.io/codacy/grade/a6e544eee7bc49e08a000bb10ba3deed?style=flat-square)](https://www.codacy.com/app/i.can.feel.gravity/vald?utm_source=github.com&utm_medium=referral&utm_content=vdaas/vald&utm_campaign=Badge_Grade) [![Go Report Card](https://goreportcard.com/badge/github.com/vdaas/vald?style=flat-square)](https://goreportcard.com/report/github.com/vdaas/vald) [![DepShield Badge](https://depshield.sonatype.org/badges/vdaas/vald/depshield.svg?style=flat-square)](https://depshield.github.io) diff --git a/cmd/agent/core/ngt/sample.yaml b/cmd/agent/core/ngt/sample.yaml index fd689b1b8e..cfca962b82 100644 --- a/cmd/agent/core/ngt/sample.yaml +++ b/cmd/agent/core/ngt/sample.yaml @@ -154,7 +154,7 @@ ngt: default_epsilon: 0.01 default_pool_size: 10000 default_radius: -1 - dimension: 4096 + dimension: 6 distance_type: l2 enable_in_memory_mode: true enable_proactive_gc: true diff --git a/internal/core/algorithm/ngt/ngt.go b/internal/core/algorithm/ngt/ngt.go index 617b50ed31..f119b9c294 100644 --- a/internal/core/algorithm/ngt/ngt.go +++ b/internal/core/algorithm/ngt/ngt.go @@ -393,24 +393,14 @@ func (n *ngt) BulkInsert(vecs [][]float32) ([]uint, []error) { ids := make([]uint, 0, len(vecs)) errs := make([]error, 0, len(vecs)) - dim := int(n.dimension) - var id uint - n.mu.Lock() - for _, vec := range vecs { - id = 0 - if len(vec) != dim { - errs = append(errs, errors.ErrIncompatibleDimensionSize(len(vec), dim)) - } else { - // n.mu.Lock() - id = uint(C.ngt_insert_index_as_float(n.index, (*C.float)(&vec[0]), C.uint32_t(n.dimension), n.ebuf)) - // n.mu.Unlock() - if id == 0 { - errs = append(errs, n.newGoError(n.ebuf)) - } + log.Infof("started to bulk insert %d of vectors", len(vecs)) + for i, vec := range vecs { + id, err := n.Insert(vec) + if err != nil { + errs = append(errs, errors.Wrapf(err, "bulkinsert error detected index number: %d,\tid: %d", i, id)) } ids = append(ids, id) } - n.mu.Unlock() return ids, errs } diff --git a/internal/net/grpc/stream.go b/internal/net/grpc/stream.go index 167c1d4d1d..4169a0eb0e 100644 --- a/internal/net/grpc/stream.go +++ b/internal/net/grpc/stream.go @@ -87,6 +87,9 @@ func BidirectionalStream(ctx context.Context, stream grpc.ServerStream, gerrs.Roots = append(gerrs.Roots, gerr) return true }) + if errs == nil { + return nil + } st, err := status.New(status.Unknown, errs.Error()).WithDetails(gerrs) if err != nil { log.Warn(err) @@ -105,7 +108,7 @@ func BidirectionalStream(ctx context.Context, stream grpc.ServerStream, if err == io.EOF { return finalize() } - log.Error(err) + log.Errorf("failed to receive stream message %v", err) continue } if data != nil { diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 1ba3189c1b..a7646bf55d 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -26,6 +26,7 @@ import ( "syscall" "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/encoding/json" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/info" @@ -95,6 +96,21 @@ func Do(ctx context.Context, opts ...Option) error { log.Init() } + log.Debugf("version info:\t\t%s\n\nconfiguration:\t\t%s\n\n", + func() string { + b, err := json.Marshal(info.Get()) + if err != nil { + return "failed to serialize build information" + } + return string(b) + }(), func() string { + b, err := json.Marshal(cfg) + if err != nil { + return "failed to serialize configuration" + } + return string(b) + }()) + // set location temporary for initialization logging location.Set(ccfg.TZ) @@ -197,7 +213,7 @@ func Run(ctx context.Context, run Runner, name string) (err error) { } err = errgroup.Wait() - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { log.Error(errors.ErrRunnerWait(name, err)) if _, ok := emap[err.Error()]; !ok { errs = append(errs, err) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index b72f6c6b21..faa178affa 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -340,7 +340,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { select { case <-ctx.Done(): err = n.CreateIndex(ctx, n.poolSize) - if err != nil { + if err != nil && !errors.Is(err, errors.ErrUncommittedIndexNotFound){ ech <- err return errors.Wrap(ctx.Err(), err.Error()) } @@ -757,7 +757,7 @@ func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err erro }() err = n.CreateIndex(ctx, poolSize) - if err != nil && err != errors.ErrUncommittedIndexNotFound { + if err != nil { return err } return n.SaveIndex(ctx) diff --git a/pkg/gateway/lb/usecase/vald.go b/pkg/gateway/lb/usecase/vald.go index c5e3a5a1b3..06ca916d1d 100644 --- a/pkg/gateway/lb/usecase/vald.go +++ b/pkg/gateway/lb/usecase/vald.go @@ -14,6 +14,7 @@ // limitations under the License. // +// Package usecase reporesents gateways usecase layer package usecase import ( From 3c4b68deb1f697f30cb27387e55afa33098b1a3a Mon Sep 17 00:00:00 2001 From: kpango Date: Sun, 17 Jan 2021 14:50:04 +0900 Subject: [PATCH 2/6] bugfix error on stream close Signed-off-by: kpango --- internal/net/grpc/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/net/grpc/stream.go b/internal/net/grpc/stream.go index 4169a0eb0e..3918960fa1 100644 --- a/internal/net/grpc/stream.go +++ b/internal/net/grpc/stream.go @@ -105,11 +105,11 @@ func BidirectionalStream(ctx context.Context, stream grpc.ServerStream, data := newData() err = stream.RecvMsg(data) if err != nil { - if err == io.EOF { + if err == io.EOF || errors.Is(err, io.EOF) { return finalize() } log.Errorf("failed to receive stream message %v", err) - continue + return errors.Wrap(finalize(), err.Error()) } if data != nil { eg.Go(safety.RecoverWithoutPanicFunc(func() (err error) { From 9bb1f2e9afcf90aa98c01cf244570cf23be6f2af Mon Sep 17 00:00:00 2001 From: Yusuke Kato Date: Mon, 18 Jan 2021 09:56:26 +0900 Subject: [PATCH 3/6] Apply suggestions from code review Co-authored-by: Rintaro Okamura --- Makefile.d/client.mk | 2 +- pkg/gateway/lb/usecase/vald.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile.d/client.mk b/Makefile.d/client.mk index c0c25e6779..a90f4762ca 100644 --- a/Makefile.d/client.mk +++ b/Makefile.d/client.mk @@ -42,4 +42,4 @@ valdcli/xpanes/insert: .PHONY: valdcli/xpanes/search ## search randomized vectors using valdcli and xpanes valdcli/xpanes/search: - xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES)) + xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES)) diff --git a/pkg/gateway/lb/usecase/vald.go b/pkg/gateway/lb/usecase/vald.go index 06ca916d1d..a6e5d7470b 100644 --- a/pkg/gateway/lb/usecase/vald.go +++ b/pkg/gateway/lb/usecase/vald.go @@ -14,7 +14,7 @@ // limitations under the License. // -// Package usecase reporesents gateways usecase layer +// Package usecase represents gateways usecase layer package usecase import ( From 8e6a9ef5870506a6ebb20911352956ab3c47a40a Mon Sep 17 00:00:00 2001 From: kpango Date: Mon, 18 Jan 2021 13:53:10 +0900 Subject: [PATCH 4/6] downgrade NGT version to 1.12.1 from 1.12.2 Signed-off-by: kpango --- versions/NGT_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/versions/NGT_VERSION b/versions/NGT_VERSION index 6b89d58f86..f8f4f03b3d 100644 --- a/versions/NGT_VERSION +++ b/versions/NGT_VERSION @@ -1 +1 @@ -1.12.2 +1.12.1 From 3fc434b6df59834269db847bb88d6bc4d54cd7f8 Mon Sep 17 00:00:00 2001 From: kpango Date: Mon, 18 Jan 2021 14:24:25 +0900 Subject: [PATCH 5/6] remove optimizer from cgo build & small refactor for backoff Signed-off-by: kpango --- Makefile.d/build.mk | 2 +- internal/backoff/backoff.go | 59 +++++++++++++++++++++---------------- versions/NGT_VERSION | 2 +- 3 files changed, 36 insertions(+), 27 deletions(-) diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 727aea63df..70d3487a59 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -47,7 +47,7 @@ cmd/agent/core/ngt/ngt: \ GOPRIVATE=$(GOPRIVATE) \ go build \ --ldflags "-s -w -linkmode 'external' \ - -extldflags '-static -fPIC -pthread -fopenmp -std=gnu++20 -lstdc++ -O3 -lm $(EXTLDFLAGS)' \ + -extldflags '-static -fPIC -pthread -fopenmp -std=c++20 -lstdc++ -lm $(EXTLDFLAGS)' \ -X '$(GOPKG)/internal/info.Version=$(VERSION)' \ -X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \ -X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \ diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go index 09325e0309..eca0bb7692 100644 --- a/internal/backoff/backoff.go +++ b/internal/backoff/backoff.go @@ -89,7 +89,14 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter for cnt := 0; cnt < b.maxRetryCount; cnt++ { select { case <-dctx.Done(): - return nil, errors.Wrap(err, dctx.Err().Error()) + switch dctx.Err() { + case context.DeadlineExceeded: + return nil, errors.ErrBackoffTimeout(err) + case context.Canceled: + return nil, err + default: + return nil, errors.Wrap(err, dctx.Err().Error()) + } default: res, ret, err = func() (val interface{}, retryable bool, err error) { ssctx, span := trace.StartSpan(dctx, traceTag+"/"+strconv.Itoa(cnt+1)) @@ -100,34 +107,36 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter }() return f(ssctx) }() - if ret && err != nil { - if b.errLog { - log.Error(err) + if !ret { + return res, err + } + if err == nil { + return res, nil + } + if b.errLog { + log.Error(err) + } + timer.Reset(time.Duration(jdur)) + select { + case <-dctx.Done(): + switch dctx.Err() { + case context.DeadlineExceeded: + return nil, errors.ErrBackoffTimeout(err) + case context.Canceled: + return nil, err + default: + return nil, errors.Wrap(dctx.Err(), err.Error()) } - timer.Reset(time.Duration(jdur)) - select { - case <-dctx.Done(): - switch dctx.Err() { - case context.DeadlineExceeded: - return nil, errors.ErrBackoffTimeout(err) - case context.Canceled: - return nil, err - default: - return nil, errors.Wrap(dctx.Err(), err.Error()) - } - case <-timer.C: - if dur >= b.durationLimit { - dur = b.maxDuration - jdur = b.maxDuration - } else { - dur *= b.backoffFactor - jdur = b.addJitter(dur) - } - continue + case <-timer.C: + if dur >= b.durationLimit { + dur = b.maxDuration + jdur = b.maxDuration + } else { + dur *= b.backoffFactor + jdur = b.addJitter(dur) } } } - return res, err } return res, err } diff --git a/versions/NGT_VERSION b/versions/NGT_VERSION index f8f4f03b3d..81f363239f 100644 --- a/versions/NGT_VERSION +++ b/versions/NGT_VERSION @@ -1 +1 @@ -1.12.1 +1.12.3 From bcfcd1d04a17a1fa527bf3e8bf8c5235aeb2cb96 Mon Sep 17 00:00:00 2001 From: kpango Date: Mon, 18 Jan 2021 14:27:13 +0900 Subject: [PATCH 6/6] add gnu option for build flag Signed-off-by: kpango --- Makefile.d/build.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 70d3487a59..a628687675 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -47,7 +47,7 @@ cmd/agent/core/ngt/ngt: \ GOPRIVATE=$(GOPRIVATE) \ go build \ --ldflags "-s -w -linkmode 'external' \ - -extldflags '-static -fPIC -pthread -fopenmp -std=c++20 -lstdc++ -lm $(EXTLDFLAGS)' \ + -extldflags '-static -fPIC -pthread -fopenmp -std=gnu++20 -lstdc++ -lm $(EXTLDFLAGS)' \ -X '$(GOPKG)/internal/info.Version=$(VERSION)' \ -X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \ -X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \