Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Aug 24, 2022
1 parent a4a6db0 commit f93beea
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 171 deletions.
2 changes: 1 addition & 1 deletion pkg/agent/core/ngt/handler/grpc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *server) CreateAndSaveIndex(ctx context.Context, c *payload.Control_Crea
log.Error(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeFailedInternal(err.Error())...)
span.SetAttributes(trace.StatusCodeInternal(err.Error())...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
Expand Down
135 changes: 128 additions & 7 deletions pkg/gateway/filter/handler/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/errdetails"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/trace"
)
Expand All @@ -42,6 +43,8 @@ type server struct {
eg errgroup.Group
defaultVectorizer string
defaultFilters []string
name string
ip string
ingress ingress.Client
egress egress.Client
gateway client.Client
Expand All @@ -68,7 +71,7 @@ func New(opts ...Option) vald.ServerWithFilter {
return s
}

func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectRequest) (*payload.Search_Response, error) {
func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectRequest) (res *payload.Search_Response, err error) {
ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.SearchObjectRPCName)
defer func() {
if span != nil {
Expand All @@ -77,27 +80,119 @@ func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectReq
}()
vr := req.GetVectorizer()
if vr == nil || vr.GetPort() == 0 {
return nil, status.WrapWithInvalidArgument("SearchObject API vectorizer configuration is invalid", errors.ErrFilterNotFound, info.Get())
err = status.WrapWithInvalidArgument(vald.SearchObjectRPCName+" API vectorizer configuration is invalid", errors.ErrFilterNotFound,
&errdetails.RequestInfo{
RequestId: req.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(req),
},
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Field: "vectorizer port",
Description: err.Error(),
},
},
},
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchObjectRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeInvalidArgument(err.Error())...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
}
if vr.GetHost() == "" {
vr.Host = "localhost"
}
target := fmt.Sprintf("%s:%d", vr.GetHost(), vr.GetPort())
if len(target) == 0 {
if len(s.Vectorizer) == 0 {
return nil, status.WrapWithInvalidArgument("SearchObject API vectorizer configuration is invalid", errors.ErrFilterNotFound, info.Get())
err = status.WrapWithInvalidArgument(vald.SearchObjectRPCName+" API vectorizer configuration is invalid", errors.ErrFilterNotFound,
&errdetails.RequestInfo{
RequestId: req.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(req),
},
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Field: "vectorizer targets",
Description: err.Error(),
},
},
},
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchObjectRPCName,
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeInvalidArgument(err.Error())...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
}
target = s.Vectorizer
}
c, err := s.ingress.Target(ctx, target)
if err != nil {
return nil, status.WrapWithUnavailable("SearchObject API target filter API unavailable", err, req, info.Get())
err = status.WrapWithUnavailable("SearchObject API target filter API unavailable", err,
&errdetails.RequestInfo{
RequestId: req.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(req),
},
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Field: "vectorizer targets",
Description: err.Error(),
},
},
},
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchObject",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}, info.Get())
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeUnavailable(err.Error())...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
}
vec, err := c.GenVector(ctx, &payload.Object_Blob{
Object: req.GetObject(),
})
if err != nil {
return nil, status.WrapWithInternal("SearchObject API failed to extract vector from filter", err, req, info.Get())
err = status.WrapWithInternal("SearchObject API failed to extract vector from filter", err,
&errdetails.RequestInfo{
RequestId: req.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(req),
},
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Field: "vectorizer targets",
Description: err.Error(),
},
},
},
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchObject",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}, info.Get())
log.Warn(err)
if span != nil {
span.RecordError(err)
span.SetAttributes(trace.StatusCodeInternal(err.Error())...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
}
return s.Search(ctx, &payload.Search_Request{
Vector: vec.GetVector(),
Expand All @@ -123,10 +218,36 @@ func (s *server) MultiSearchObject(ctx context.Context, reqs *payload.Search_Mul
wg.Add(1)
s.eg.Go(func() error {
defer wg.Done()
ctx, sspan := trace.StartSpan(ctx, fmt.Sprintf("%s.%s/errgroup.Go/id-%d", apiName, vald.MultiSearchObjectRPCName, idx))
defer func() {
if sspan != nil {
sspan.End()
}
}()
r, err := s.SearchObject(ctx, query)
if err != nil {
if span != nil {
span.SetStatus(trace.StatusCodeNotFound(err.Error()))
err = status.WrapWithNotFound(vald.MultiSearchObjectRPCName+" API object "+string(query.GetObject())+"'s search request result not found", err,
&errdetails.RequestInfo{
RequestId: req.GetConfig().GetRequestId(),
ServingData: errdetails.Serialize(req),
},
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Field: "vectorizer targets",
Description: err.Error(),
},
},
},
&errdetails.ResourceInfo{
ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchObject",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}, info.Get())
log.Warn(err)
if sspan != nil {
sspan.RecordError(err)
sspan.SetAttributes(trace.StatusCodeInternal(err.Error())...)
sspan.SetStatus(trace.StatusError, err.Error())
}
mu.Lock()
if errs == nil {
Expand Down
29 changes: 29 additions & 0 deletions pkg/gateway/filter/handler/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,48 @@
package grpc

import (
"os"
"runtime"

"github.com/vdaas/vald/internal/client/v1/client/filter/egress"
"github.com/vdaas/vald/internal/client/v1/client/filter/ingress"
"github.com/vdaas/vald/internal/client/v1/client/vald"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net"
)

type Option func(*server)

var defaultOptions = []Option{
WithErrGroup(errgroup.Get()),
WithStreamConcurrency(runtime.GOMAXPROCS(-1) * 10),
WithName(func() string {
name, err := os.Hostname()
if err != nil {
log.Warn(err)
}
return name
}()),
WithIP(net.LoadLocalIP()),
}

// WithIP returns the option to set the IP for server.
func WithIP(ip string) Option {
return func(s *server) {
if len(ip) != 0 {
s.ip = ip
}
}
}

// WithName returns the option to set the name for server.
func WithName(name string) Option {
return func(s *server) {
if len(name) != 0 {
s.name = name
}
}
}

func WithIngressFilterClient(c ingress.Client) Option {
Expand Down
Loading

0 comments on commit f93beea

Please sign in to comment.