Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

os.free nil pointer failure in ngt cgo due to create index hang up #930

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ PORT ?= 80
NUMBER ?= 10
DIMENSION ?= 6
NUMPANES ?= 4
MEAN ?= 0.0
STDDEV ?= 1.0

BODY = ""

Expand Down
2 changes: 1 addition & 1 deletion Makefile.d/build.mk
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ cmd/agent/core/ngt/ngt: \
GOPRIVATE=$(GOPRIVATE) \
go build \
--ldflags "-s -w -linkmode 'external' \
-extldflags '-static -fPIC -pthread -fopenmp -std=gnu++20 -lstdc++ -O3 -lm $(EXTLDFLAGS)' \
-extldflags '-static -fPIC -pthread -fopenmp -std=gnu++20 -lstdc++ -lm $(EXTLDFLAGS)' \
-X '$(GOPKG)/internal/info.Version=$(VERSION)' \
-X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \
-X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \
Expand Down
4 changes: 2 additions & 2 deletions Makefile.d/client.mk
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ endif
.PHONY: valdcli/xpanes/insert
## insert randomized vectors using valdcli and xpanes
valdcli/xpanes/insert:
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-insert --elapsed-time" $$(seq 1 $(NUMPANES))
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) --with-ids | valdcli -h $(HOST) -p $(PORT) stream-insert --elapsed-time" $$(seq 1 $(NUMPANES))

.PHONY: valdcli/xpanes/search
## search randomized vectors using valdcli and xpanes
valdcli/xpanes/search:
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES))
xpanes -c "valdcli rand-vecs -n $(NUMBER) -d $(DIMENSION) --gaussian --gaussian-mean $(MEAN) --gaussian-stddev $(STDDEV) | valdcli -h $(HOST) -p $(PORT) stream-search --elapsed-time" $$(seq 1 $(NUMPANES))
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

[![License: Apache 2.0](https://img.shields.io/github/license/vdaas/vald.svg?style=flat-square)](https://opensource.org/licenses/Apache-2.0)
[![release](https://img.shields.io/github/release/vdaas/vald.svg?style=flat-square)](https://github.com/vdaas/vald/releases/latest)
[![GoDoc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat-square)](https://pkg.go.dev/github.com/vdaas/vald)
[![Go Reference](https://pkg.go.dev/badge/github.com/vdaas/vald.svg)](https://pkg.go.dev/github.com/vdaas/vald)
[![Codacy Badge](https://img.shields.io/codacy/grade/a6e544eee7bc49e08a000bb10ba3deed?style=flat-square)](https://www.codacy.com/app/i.can.feel.gravity/vald?utm_source=github.com&utm_medium=referral&utm_content=vdaas/vald&utm_campaign=Badge_Grade)
[![Go Report Card](https://goreportcard.com/badge/github.com/vdaas/vald?style=flat-square)](https://goreportcard.com/report/github.com/vdaas/vald)
[![DepShield Badge](https://depshield.sonatype.org/badges/vdaas/vald/depshield.svg?style=flat-square)](https://depshield.github.io)
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/core/ngt/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ ngt:
default_epsilon: 0.01
default_pool_size: 10000
default_radius: -1
dimension: 4096
dimension: 6
distance_type: l2
enable_in_memory_mode: true
enable_proactive_gc: true
Expand Down
59 changes: 34 additions & 25 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,14 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter
for cnt := 0; cnt < b.maxRetryCount; cnt++ {
select {
case <-dctx.Done():
return nil, errors.Wrap(err, dctx.Err().Error())
switch dctx.Err() {
case context.DeadlineExceeded:
return nil, errors.ErrBackoffTimeout(err)
case context.Canceled:
return nil, err
default:
return nil, errors.Wrap(err, dctx.Err().Error())
}
default:
res, ret, err = func() (val interface{}, retryable bool, err error) {
ssctx, span := trace.StartSpan(dctx, traceTag+"/"+strconv.Itoa(cnt+1))
Expand All @@ -100,34 +107,36 @@ func (b *backoff) Do(ctx context.Context, f func(ctx context.Context) (val inter
}()
return f(ssctx)
}()
if ret && err != nil {
if b.errLog {
log.Error(err)
if !ret {
return res, err
}
if err == nil {
return res, nil
}
if b.errLog {
log.Error(err)
}
timer.Reset(time.Duration(jdur))
select {
case <-dctx.Done():
switch dctx.Err() {
case context.DeadlineExceeded:
return nil, errors.ErrBackoffTimeout(err)
case context.Canceled:
return nil, err
default:
return nil, errors.Wrap(dctx.Err(), err.Error())
}
timer.Reset(time.Duration(jdur))
select {
case <-dctx.Done():
switch dctx.Err() {
case context.DeadlineExceeded:
return nil, errors.ErrBackoffTimeout(err)
case context.Canceled:
return nil, err
default:
return nil, errors.Wrap(dctx.Err(), err.Error())
}
case <-timer.C:
if dur >= b.durationLimit {
dur = b.maxDuration
jdur = b.maxDuration
} else {
dur *= b.backoffFactor
jdur = b.addJitter(dur)
}
continue
case <-timer.C:
if dur >= b.durationLimit {
dur = b.maxDuration
jdur = b.maxDuration
} else {
dur *= b.backoffFactor
jdur = b.addJitter(dur)
}
}
}
return res, err
}
return res, err
}
Expand Down
20 changes: 5 additions & 15 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,24 +393,14 @@ func (n *ngt) BulkInsert(vecs [][]float32) ([]uint, []error) {
ids := make([]uint, 0, len(vecs))
errs := make([]error, 0, len(vecs))

dim := int(n.dimension)
var id uint
n.mu.Lock()
for _, vec := range vecs {
id = 0
if len(vec) != dim {
errs = append(errs, errors.ErrIncompatibleDimensionSize(len(vec), dim))
} else {
// n.mu.Lock()
id = uint(C.ngt_insert_index_as_float(n.index, (*C.float)(&vec[0]), C.uint32_t(n.dimension), n.ebuf))
// n.mu.Unlock()
if id == 0 {
errs = append(errs, n.newGoError(n.ebuf))
}
log.Infof("started to bulk insert %d of vectors", len(vecs))
for i, vec := range vecs {
id, err := n.Insert(vec)
if err != nil {
errs = append(errs, errors.Wrapf(err, "bulkinsert error detected index number: %d,\tid: %d", i, id))
}
ids = append(ids, id)
}
n.mu.Unlock()

return ids, errs
}
Expand Down
9 changes: 6 additions & 3 deletions internal/net/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func BidirectionalStream(ctx context.Context, stream grpc.ServerStream,
gerrs.Roots = append(gerrs.Roots, gerr)
return true
})
if errs == nil {
return nil
}
st, err := status.New(status.Unknown, errs.Error()).WithDetails(gerrs)
if err != nil {
log.Warn(err)
Expand All @@ -102,11 +105,11 @@ func BidirectionalStream(ctx context.Context, stream grpc.ServerStream,
data := newData()
err = stream.RecvMsg(data)
if err != nil {
if err == io.EOF {
if err == io.EOF || errors.Is(err, io.EOF) {
return finalize()
}
log.Error(err)
continue
log.Errorf("failed to receive stream message %v", err)
return errors.Wrap(finalize(), err.Error())
}
if data != nil {
eg.Go(safety.RecoverWithoutPanicFunc(func() (err error) {
Expand Down
18 changes: 17 additions & 1 deletion internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"syscall"

"github.com/vdaas/vald/internal/config"
"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/info"
Expand Down Expand Up @@ -95,6 +96,21 @@ func Do(ctx context.Context, opts ...Option) error {
log.Init()
}

log.Debugf("version info:\t\t%s\n\nconfiguration:\t\t%s\n\n",
func() string {
b, err := json.Marshal(info.Get())
if err != nil {
return "failed to serialize build information"
}
return string(b)
}(), func() string {
b, err := json.Marshal(cfg)
if err != nil {
return "failed to serialize configuration"
}
return string(b)
}())

// set location temporary for initialization logging
location.Set(ccfg.TZ)

Expand Down Expand Up @@ -197,7 +213,7 @@ func Run(ctx context.Context, run Runner, name string) (err error) {
}

err = errgroup.Wait()
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(errors.ErrRunnerWait(name, err))
if _, ok := emap[err.Error()]; !ok {
errs = append(errs, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
select {
case <-ctx.Done():
err = n.CreateIndex(ctx, n.poolSize)
if err != nil {
if err != nil && !errors.Is(err, errors.ErrUncommittedIndexNotFound){
ech <- err
return errors.Wrap(ctx.Err(), err.Error())
}
Expand Down Expand Up @@ -757,7 +757,7 @@ func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err erro
}()

err = n.CreateIndex(ctx, poolSize)
if err != nil && err != errors.ErrUncommittedIndexNotFound {
if err != nil {
return err
}
return n.SaveIndex(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/gateway/lb/usecase/vald.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.
//

// Package usecase represents gateways usecase layer
package usecase

import (
Expand Down
2 changes: 1 addition & 1 deletion versions/NGT_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.12.2
1.12.3