diff --git a/pkg/agent/core/ngt/handler/grpc/index.go b/pkg/agent/core/ngt/handler/grpc/index.go index 3e9f7240b69..24d03a88488 100644 --- a/pkg/agent/core/ngt/handler/grpc/index.go +++ b/pkg/agent/core/ngt/handler/grpc/index.go @@ -154,7 +154,7 @@ func (s *server) CreateAndSaveIndex(ctx context.Context, c *payload.Control_Crea log.Error(err) if span != nil { span.RecordError(err) - span.SetAttributes(trace.StatusCodeFailedInternal(err.Error())...) + span.SetAttributes(trace.StatusCodeInternal(err.Error())...) span.SetStatus(trace.StatusError, err.Error()) } return nil, err diff --git a/pkg/gateway/filter/handler/grpc/handler.go b/pkg/gateway/filter/handler/grpc/handler.go index 5eae935cccb..eaf23c1925b 100644 --- a/pkg/gateway/filter/handler/grpc/handler.go +++ b/pkg/gateway/filter/handler/grpc/handler.go @@ -34,6 +34,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/errdetails" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/trace" ) @@ -42,6 +43,8 @@ type server struct { eg errgroup.Group defaultVectorizer string defaultFilters []string + name string + ip string ingress ingress.Client egress egress.Client gateway client.Client @@ -68,7 +71,7 @@ func New(opts ...Option) vald.ServerWithFilter { return s } -func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectRequest) (*payload.Search_Response, error) { +func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectRequest) (res *payload.Search_Response, err error) { ctx, span := trace.StartSpan(ctx, apiName+"/"+vald.SearchObjectRPCName) defer func() { if span != nil { @@ -77,7 +80,30 @@ func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectReq }() vr := req.GetVectorizer() if vr == nil || vr.GetPort() == 0 { - return nil, status.WrapWithInvalidArgument("SearchObject API vectorizer configuration is invalid", errors.ErrFilterNotFound, info.Get()) + err = status.WrapWithInvalidArgument(vald.SearchObjectRPCName+" API vectorizer configuration is invalid", errors.ErrFilterNotFound, + &errdetails.RequestInfo{ + RequestId: req.GetConfig().GetRequestId(), + ServingData: errdetails.Serialize(req), + }, + &errdetails.BadRequest{ + FieldViolations: []*errdetails.BadRequestFieldViolation{ + { + Field: "vectorizer port", + Description: err.Error(), + }, + }, + }, + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchObjectRPCName, + ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), + }) + log.Warn(err) + if span != nil { + span.RecordError(err) + span.SetAttributes(trace.StatusCodeInvalidArgument(err.Error())...) + span.SetStatus(trace.StatusError, err.Error()) + } + return nil, err } if vr.GetHost() == "" { vr.Host = "localhost" @@ -85,19 +111,88 @@ func (s *server) SearchObject(ctx context.Context, req *payload.Search_ObjectReq target := fmt.Sprintf("%s:%d", vr.GetHost(), vr.GetPort()) if len(target) == 0 { if len(s.Vectorizer) == 0 { - return nil, status.WrapWithInvalidArgument("SearchObject API vectorizer configuration is invalid", errors.ErrFilterNotFound, info.Get()) + err = status.WrapWithInvalidArgument(vald.SearchObjectRPCName+" API vectorizer configuration is invalid", errors.ErrFilterNotFound, + &errdetails.RequestInfo{ + RequestId: req.GetConfig().GetRequestId(), + ServingData: errdetails.Serialize(req), + }, + &errdetails.BadRequest{ + FieldViolations: []*errdetails.BadRequestFieldViolation{ + { + Field: "vectorizer targets", + Description: err.Error(), + }, + }, + }, + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchObjectRPCName, + ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), + }) + log.Warn(err) + if span != nil { + span.RecordError(err) + span.SetAttributes(trace.StatusCodeInvalidArgument(err.Error())...) + span.SetStatus(trace.StatusError, err.Error()) + } + return nil, err } target = s.Vectorizer } c, err := s.ingress.Target(ctx, target) if err != nil { - return nil, status.WrapWithUnavailable("SearchObject API target filter API unavailable", err, req, info.Get()) + err = status.WrapWithUnavailable("SearchObject API target filter API unavailable", err, + &errdetails.RequestInfo{ + RequestId: req.GetConfig().GetRequestId(), + ServingData: errdetails.Serialize(req), + }, + &errdetails.BadRequest{ + FieldViolations: []*errdetails.BadRequestFieldViolation{ + { + Field: "vectorizer targets", + Description: err.Error(), + }, + }, + }, + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchObject", + ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), + }, info.Get()) + log.Warn(err) + if span != nil { + span.RecordError(err) + span.SetAttributes(trace.StatusCodeUnavailable(err.Error())...) + span.SetStatus(trace.StatusError, err.Error()) + } + return nil, err } vec, err := c.GenVector(ctx, &payload.Object_Blob{ Object: req.GetObject(), }) if err != nil { - return nil, status.WrapWithInternal("SearchObject API failed to extract vector from filter", err, req, info.Get()) + err = status.WrapWithInternal("SearchObject API failed to extract vector from filter", err, + &errdetails.RequestInfo{ + RequestId: req.GetConfig().GetRequestId(), + ServingData: errdetails.Serialize(req), + }, + &errdetails.BadRequest{ + FieldViolations: []*errdetails.BadRequestFieldViolation{ + { + Field: "vectorizer targets", + Description: err.Error(), + }, + }, + }, + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchObject", + ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), + }, info.Get()) + log.Warn(err) + if span != nil { + span.RecordError(err) + span.SetAttributes(trace.StatusCodeInternal(err.Error())...) + span.SetStatus(trace.StatusError, err.Error()) + } + return nil, err } return s.Search(ctx, &payload.Search_Request{ Vector: vec.GetVector(), @@ -123,10 +218,36 @@ func (s *server) MultiSearchObject(ctx context.Context, reqs *payload.Search_Mul wg.Add(1) s.eg.Go(func() error { defer wg.Done() + ctx, sspan := trace.StartSpan(ctx, fmt.Sprintf("%s.%s/errgroup.Go/id-%d", apiName, vald.MultiSearchObjectRPCName, idx)) + defer func() { + if sspan != nil { + sspan.End() + } + }() r, err := s.SearchObject(ctx, query) if err != nil { - if span != nil { - span.SetStatus(trace.StatusCodeNotFound(err.Error())) + err = status.WrapWithNotFound(vald.MultiSearchObjectRPCName+" API object "+string(query.GetObject())+"'s search request result not found", err, + &errdetails.RequestInfo{ + RequestId: req.GetConfig().GetRequestId(), + ServingData: errdetails.Serialize(req), + }, + &errdetails.BadRequest{ + FieldViolations: []*errdetails.BadRequestFieldViolation{ + { + Field: "vectorizer targets", + Description: err.Error(), + }, + }, + }, + &errdetails.ResourceInfo{ + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchObject", + ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), + }, info.Get()) + log.Warn(err) + if sspan != nil { + sspan.RecordError(err) + sspan.SetAttributes(trace.StatusCodeInternal(err.Error())...) + sspan.SetStatus(trace.StatusError, err.Error()) } mu.Lock() if errs == nil { diff --git a/pkg/gateway/filter/handler/grpc/option.go b/pkg/gateway/filter/handler/grpc/option.go index 27df1807651..7f55caeea8f 100644 --- a/pkg/gateway/filter/handler/grpc/option.go +++ b/pkg/gateway/filter/handler/grpc/option.go @@ -18,12 +18,15 @@ package grpc import ( + "os" "runtime" "github.com/vdaas/vald/internal/client/v1/client/filter/egress" "github.com/vdaas/vald/internal/client/v1/client/filter/ingress" "github.com/vdaas/vald/internal/client/v1/client/vald" "github.com/vdaas/vald/internal/errgroup" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net" ) type Option func(*server) @@ -31,6 +34,32 @@ type Option func(*server) var defaultOptions = []Option{ WithErrGroup(errgroup.Get()), WithStreamConcurrency(runtime.GOMAXPROCS(-1) * 10), + WithName(func() string { + name, err := os.Hostname() + if err != nil { + log.Warn(err) + } + return name + }()), + WithIP(net.LoadLocalIP()), +} + +// WithIP returns the option to set the IP for server. +func WithIP(ip string) Option { + return func(s *server) { + if len(ip) != 0 { + s.ip = ip + } + } +} + +// WithName returns the option to set the name for server. +func WithName(name string) Option { + return func(s *server) { + if len(name) != 0 { + s.name = name + } + } } func WithIngressFilterClient(c ingress.Client) Option { diff --git a/pkg/gateway/lb/handler/grpc/handler.go b/pkg/gateway/lb/handler/grpc/handler.go index a71c663f278..cc1fb884cc7 100644 --- a/pkg/gateway/lb/handler/grpc/handler.go +++ b/pkg/gateway/lb/handler/grpc/handler.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "math/big" + "strconv" "sync" "sync/atomic" "time" @@ -78,7 +79,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo uuid := meta.GetId() if len(uuid) == 0 { err = errors.ErrInvalidUUID(uuid) - err = status.WrapWithInvalidArgument(fmt.Sprintf("Exists API invalid argument for uuid \"%s\" detected", uuid), err, + err = status.WrapWithInvalidArgument(vald.ExistsRPCName+" API invalid argument for uuid \""+uuid+"\" detected", err, &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(meta), @@ -92,7 +93,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo }, }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Exists", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.ExistsRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), }) log.Warn(err) @@ -112,7 +113,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo ctx, cancel = context.WithCancel(ctx) var once sync.Once ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { - sctx, sspan := trace.StartSpan(ctx, apiName+".Exists/"+target) + sctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.ExistsRPCName+"/"+target) defer func() { if sspan != nil { sspan.End() @@ -129,7 +130,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo sspan.RecordError(err) sspan.SetAttributes(trace.StatusCodeCancelled( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.Exists.BroadCast/" + + "/vald.v1." + vald.ExistsRPCName + ".BroadCast/" + target + " canceled: " + err.Error())...) sspan.SetStatus(trace.StatusError, err.Error()) } @@ -140,7 +141,7 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo sspan.RecordError(err) sspan.SetAttributes(trace.StatusCodeDeadlineExceeded( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.Exists.BroadCast/" + + "/vald.v1." + vald.ExistsRPCName + ".BroadCast/" + target + " deadline_exceeded: " + err.Error())...) sspan.SetStatus(trace.StatusError, err.Error()) } @@ -150,13 +151,13 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo st *status.Status msg string ) - st, msg, err = status.ParseError(err, codes.NotFound, fmt.Sprintf("error Exists API meta %s's uuid not found", uuid), + st, msg, err = status.ParseError(err, codes.NotFound, "error "+vald.ExistsRPCName+" API meta "+uuid+"'s uuid not found", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(meta), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Exists", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.ExistsRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }) if sspan != nil { @@ -189,13 +190,13 @@ func (s *server) Exists(ctx context.Context, meta *payload.Object_ID) (id *paylo if err == nil { err = errors.ErrObjectIDNotFound(uuid) } - st, msg, err := status.ParseError(err, codes.NotFound, fmt.Sprintf("error Exists API meta %s's uuid not found", uuid), + st, msg, err := status.ParseError(err, codes.NotFound, "error "+vald.ExistsRPCName+" API meta "+uuid+"'s uuid not found", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(meta), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Exists", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.ExistsRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -219,7 +220,7 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * vl := len(req.GetVector()) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("Search API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.SearchRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(req), @@ -258,12 +259,12 @@ func (s *server) Search(ctx context.Context, req *payload.Search_Request) (res * }) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse Search gRPC error response", + "failed to parse "+vald.SearchRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Search", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -288,7 +289,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) uuid := req.GetId() if len(uuid) == 0 { err = errors.ErrInvalidMetaDataConfig - err = status.WrapWithInvalidArgument("SearchByID API invalid uuid", err, + err = status.WrapWithInvalidArgument(vald.SearchByIDRPCName+" API invalid uuid", err, &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(req), @@ -302,7 +303,7 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) }, }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchByID", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchByIDRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), }) if span != nil { @@ -335,13 +336,13 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) EgressFilters: cfg.GetEgressFilters(), } if err != nil { - _, _, err := status.ParseError(err, codes.NotFound, fmt.Sprintf("SearchByID API failed to get uuid %s's object", req.GetId()), + _, _, err := status.ParseError(err, codes.NotFound, vald.SearchByIDRPCName+" API failed to get uuid "+req.GetId()+"'s object", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(oreq), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchByIDRPCName + "." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) var serr error @@ -352,12 +353,12 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) return res, nil } err = errors.Wrap(err, serr.Error()) - st, msg, serr := status.ParseError(err, codes.Internal, "SearchByID API failed to process search request", + st, msg, serr := status.ParseError(err, codes.Internal, vald.SearchByIDRPCName+" API failed to process search request", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchByID", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchByIDRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -372,12 +373,12 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) Config: scfg, }) if err != nil { - _, _, err := status.ParseError(err, codes.Internal, "SearchByID API failed to process search request", + _, _, err := status.ParseError(err, codes.Internal, vald.SearchByIDRPCName+" API failed to process search request", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Search", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchByIDRPCName + "." + vald.SearchRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) var serr error @@ -387,12 +388,12 @@ func (s *server) SearchByID(ctx context.Context, req *payload.Search_IDRequest) if serr == nil { return res, nil } - st, msg, serr := status.ParseError(serr, codes.Internal, "SearchByID API failed to process search request", + st, msg, serr := status.ParseError(serr, codes.Internal, vald.SearchByIDRPCName+" API failed to process search request", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.SearchByID", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.SearchByIDRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) err = errors.Wrap(err, serr.Error()) @@ -474,7 +475,7 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, case err != nil: st, msg, err := status.ParseError(err, codes.Internal, "failed to parse search gRPC error response", &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Search", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }) if sspan != nil { @@ -491,7 +492,7 @@ func (s *server) search(ctx context.Context, cfg *payload.Search_Config, case r == nil || len(r.GetResults()) == 0: err = status.WrapWithNotFound("failed to process search request", errors.ErrEmptySearchResult, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Search", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.search", ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }) if sspan != nil { @@ -712,7 +713,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) func() interface{} { return new(payload.Search_Request) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Search_Request) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamSearch/requestID-"+req.GetConfig().GetRequestId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamSearchRPCName+"/requestID-"+req.GetConfig().GetRequestId()) defer func() { if sspan != nil { sspan.End() @@ -720,7 +721,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) }() res, err := s.Search(ctx, data.(*payload.Search_Request)) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Search gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -741,7 +742,7 @@ func (s *server) StreamSearch(stream vald.Search_StreamSearchServer) (err error) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse StreamSearch gRPC error response") + "failed to parse "+vald.StreamSearchRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -763,7 +764,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er func() interface{} { return new(payload.Search_IDRequest) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Search_IDRequest) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamSearchByID/id-"+req.GetId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamSearchByIDRPCName+"/id-"+req.GetId()) defer func() { if sspan != nil { sspan.End() @@ -771,7 +772,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er }() res, err := s.SearchByID(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse SearchByID gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchByIDRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -792,7 +793,7 @@ func (s *server) StreamSearchByID(stream vald.Search_StreamSearchByIDServer) (er if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse StreamSearchByID gRPC error response") + "failed to parse "+vald.StreamSearchByIDRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -823,7 +824,7 @@ func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequ vl := len(req.GetVector()) if vl < algorithm.MinimumVectorDimensionSize { err := errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("MultiSearch API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.MultiSearchRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(req), @@ -846,7 +847,7 @@ func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequ wg.Add(1) s.eg.Go(func() error { defer wg.Done() - ctx, sspan := trace.StartSpan(ctx, fmt.Sprintf("%s.MultiSearch/errgroup.Go/id-%d", apiName, idx)) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.MultiSearchRPCName+"/errgroup.Go/id-"+strconv.Itoa(idx)) defer func() { if sspan != nil { sspan.End() @@ -854,7 +855,7 @@ func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequ }() r, err := s.Search(ctx, query) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Search gRPC error response", + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.SearchRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: query.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(query), @@ -879,13 +880,13 @@ func (s *server) MultiSearch(ctx context.Context, reqs *payload.Search_MultiRequ } wg.Wait() if errs != nil { - st, msg, err := status.ParseError(errs, codes.Internal, "failed to parse MultiSearch gRPC error response", + st, msg, err := status.ParseError(errs, codes.Internal, "failed to parse "+vald.MultiSearchRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(rids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiSearch", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiSearchRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -917,7 +918,7 @@ func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_Multi rids = append(rids, req.GetConfig().GetRequestId()) wg.Add(1) s.eg.Go(func() error { - sctx, sspan := trace.StartSpan(ctx, fmt.Sprintf("%s.MultiSearchByID/errgroup.Go/id-%d", apiName, idx)) + sctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.MultiSearchByIDRPCName+"/errgroup.Go/id-"+strconv.Itoa(idx)) defer func() { if span != nil { span.End() @@ -927,7 +928,7 @@ func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_Multi r, err := s.SearchByID(sctx, query) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse SearchByID gRPC error response", + "failed to parse "+vald.SearchByIDRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: query.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(query), @@ -953,13 +954,13 @@ func (s *server) MultiSearchByID(ctx context.Context, reqs *payload.Search_Multi wg.Wait() if errs != nil { st, msg, err := status.ParseError(errs, codes.Internal, - "failed to parse MultiSearchByID gRPC error response", + "failed to parse "+vald.MultiSearchByIDRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(rids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiSearchByID", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiSearchByIDRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -982,7 +983,7 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) vl := len(req.GetVector()) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("LinearSearch API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.LinearSearchRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(req), @@ -1019,12 +1020,12 @@ func (s *server) LinearSearch(ctx context.Context, req *payload.Search_Request) }) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse LinearSearch gRPC error response", + "failed to parse "+vald.LinearSearchRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.LinearSearch", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.LinearSearchRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -1048,7 +1049,7 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq }() if len(req.GetId()) == 0 { err = errors.ErrInvalidMetaDataConfig - err = status.WrapWithInvalidArgument("LinearSearchByID API invalid uuid", err, + err = status.WrapWithInvalidArgument(vald.LinearSearchByIDRPCName+" API invalid uuid", err, &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(req), @@ -1089,13 +1090,13 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq EgressFilters: cfg.GetEgressFilters(), } if err != nil { - _, _, err := status.ParseError(err, codes.NotFound, fmt.Sprintf("LinearSearchByID API failed to get uuid %s's object", req.GetId()), + _, _, err := status.ParseError(err, codes.NotFound, fmt.Sprintf(vald.LinearSearchByIDRPCName+" API failed to get uuid %s's object", req.GetId()), &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(oreq), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.LinearSearchByIDRPCName + "." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) var serr error @@ -1106,12 +1107,12 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq return res, nil } err = errors.Wrap(err, serr.Error()) - st, msg, serr := status.ParseError(err, codes.Internal, "LinearSearchByID API failed to process search request", + st, msg, serr := status.ParseError(err, codes.Internal, vald.LinearSearchByIDRPCName+" API failed to process search request", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.LinearSearchByID", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.LinearSearchByIDRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -1127,12 +1128,12 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq Config: scfg, }) if err != nil { - _, _, err := status.ParseError(err, codes.Internal, "LinearSearchByID API failed to process search request", + _, _, err := status.ParseError(err, codes.Internal, vald.LinearSearchByIDRPCName+" API failed to process search request", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.LinearSearch", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.LinearSearchByIDRPCName + "." + vald.LinearSearchRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) var serr error @@ -1142,12 +1143,12 @@ func (s *server) LinearSearchByID(ctx context.Context, req *payload.Search_IDReq if serr == nil { return res, nil } - st, msg, serr := status.ParseError(serr, codes.Internal, "LinearSearchByID API failed to process search request", + st, msg, serr := status.ParseError(serr, codes.Internal, vald.LinearSearchByIDRPCName+" API failed to process search request", &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.LinearSearchByID", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.LinearSearchByIDRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) err = errors.Wrap(err, serr.Error()) @@ -1172,7 +1173,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) func() interface{} { return new(payload.Search_Request) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Search_Request) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamLinearSearch/requestID-"+req.GetConfig().GetRequestId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamLinearSearchRPCName+"/requestID-"+req.GetConfig().GetRequestId()) defer func() { if sspan != nil { sspan.End() @@ -1180,7 +1181,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) }() res, err := s.LinearSearch(ctx, data.(*payload.Search_Request)) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse LinearSearch gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -1201,7 +1202,7 @@ func (s *server) StreamLinearSearch(stream vald.Search_StreamLinearSearchServer) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse StreamLinearSearch gRPC error response") + "failed to parse "+vald.StreamLinearSearchRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -1223,7 +1224,7 @@ func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByI func() interface{} { return new(payload.Search_IDRequest) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Search_IDRequest) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamLinearSearchByID/id-"+req.GetId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamLinearSearchByIDRPCName+"/id-"+req.GetId()) defer func() { if sspan != nil { sspan.End() @@ -1231,7 +1232,7 @@ func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByI }() res, err := s.LinearSearchByID(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse LinearSearchByID gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -1252,7 +1253,7 @@ func (s *server) StreamLinearSearchByID(stream vald.Search_StreamLinearSearchByI if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse StreamLinearSearchByID gRPC error response") + "failed to parse "+vald.StreamLinearSearchByIDRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -1283,7 +1284,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_Mul vl := len(req.GetVector()) if vl < algorithm.MinimumVectorDimensionSize { err := errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("MultiLinearSearch API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.MultiLinearSearchRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: req.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(req), @@ -1306,7 +1307,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_Mul wg.Add(1) s.eg.Go(func() error { defer wg.Done() - ctx, sspan := trace.StartSpan(ctx, fmt.Sprintf("%s.MultiLinearSearch/errgroup.Go/id-%d", apiName, idx)) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.MultiLinearSearchRPCName+"/errgroup.Go/id-"+strconv.Itoa(idx)) defer func() { if sspan != nil { sspan.End() @@ -1314,7 +1315,7 @@ func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_Mul }() r, err := s.LinearSearch(ctx, query) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse LinearSearch gRPC error response", + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.LinearSearchRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: query.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(query), @@ -1339,13 +1340,13 @@ func (s *server) MultiLinearSearch(ctx context.Context, reqs *payload.Search_Mul } wg.Wait() if errs != nil { - st, msg, err := status.ParseError(errs, codes.Internal, "failed to parse MultiLinearSearch gRPC error response", + st, msg, err := status.ParseError(errs, codes.Internal, "failed to parse "+vald.MultiLinearSearchRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(rids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiLinearSearch", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiLinearSearchRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -1377,17 +1378,17 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search rids = append(rids, req.GetConfig().GetRequestId()) wg.Add(1) s.eg.Go(func() error { - sctx, sspan := trace.StartSpan(ctx, fmt.Sprintf("%s.MultiLinearSearchByID/errgroup.Go/id-%d", apiName, idx)) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.MultiLinearSearchByIDRPCName+"/errgroup.Go/id-"+strconv.Itoa(idx)) defer func() { - if span != nil { - span.End() + if sspan != nil { + sspan.End() } }() defer wg.Done() - r, err := s.LinearSearchByID(sctx, query) + r, err := s.LinearSearchByID(ctx, query) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse LinearSearchByID gRPC error response", + "failed to parse "+vald.LinearSearchByIDRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: query.GetConfig().GetRequestId(), ServingData: errdetails.Serialize(query), @@ -1413,13 +1414,13 @@ func (s *server) MultiLinearSearchByID(ctx context.Context, reqs *payload.Search wg.Wait() if errs != nil { st, msg, err := status.ParseError(errs, codes.Internal, - "failed to parse MultiLinearSearchByID gRPC error response", + "failed to parse "+vald.MultiLinearSearchByIDRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(rids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiLinearSearchByID", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiLinearSearchByIDRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -1442,7 +1443,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p uuid := req.GetVector().GetId() if len(uuid) == 0 { err = errors.ErrInvalidMetaDataConfig - err = status.WrapWithInvalidArgument("Insert API invalid uuid", err, + err = status.WrapWithInvalidArgument(vald.InsertRPCName+" API invalid uuid", err, &errdetails.RequestInfo{ ServingData: errdetails.Serialize(req), }, @@ -1455,7 +1456,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p }, }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Insert", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), }) if span != nil { @@ -1469,7 +1470,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p vl := len(vec) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("Insert API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.InsertRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), @@ -1498,13 +1499,13 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p err = errors.ErrMetaDataAlreadyExists(uuid) } st, msg, err := status.ParseError(err, codes.AlreadyExists, - fmt.Sprintf("error Insert API ID = %v already exists", uuid), + "error "+vald.InsertRPCName+" API ID = "+uuid+" already exists", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Exists", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName + "." + vald.ExistsRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -1539,7 +1540,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p emu := new(sync.Mutex) var errs error err = s.gateway.DoMulti(ctx, s.replica, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Insert/"+target) + ctx, span := trace.StartSpan(ctx, apiName+"."+vald.InsertRPCName+"/"+target) defer func() { if span != nil { span.End() @@ -1554,7 +1555,7 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p span.RecordError(err) span.SetAttributes(trace.StatusCodeCancelled( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.Insert.DoMulti/" + + "/vald.v1." + vald.InsertRPCName + ".DoMulti/" + target + " canceled: " + err.Error())...) span.SetStatus(trace.StatusError, err.Error()) } @@ -1565,20 +1566,20 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p span.RecordError(err) span.SetAttributes(trace.StatusCodeDeadlineExceeded( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.Insert.DoMulti/" + + "/vald.v1." + vald.InsertRPCName + ".DoMulti/" + target + " deadline_exceeded: " + err.Error())...) span.SetStatus(trace.StatusError, err.Error()) } return nil } st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse Insert gRPC error response", + "failed to parse "+vald.InsertRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Insert", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }) if span != nil { @@ -1613,13 +1614,13 @@ func (s *server) Insert(ctx context.Context, req *payload.Insert_Request) (ce *p } if errs != nil { st, msg, err := status.ParseError(errs, codes.Internal, - "failed to parse Insert gRPC error response", + "failed to parse "+vald.InsertRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Insert.DoMulti", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.InsertRPCName + ".DoMulti", ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -1644,7 +1645,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) func() interface{} { return new(payload.Insert_Request) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Insert_Request) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamInsert/id-"+req.GetVector().GetId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamInsertRPCName+"/id-"+req.GetVector().GetId()) defer func() { if sspan != nil { sspan.End() @@ -1652,7 +1653,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }() res, err := s.Insert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Insert gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.InsertRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -1672,7 +1673,7 @@ func (s *server) StreamInsert(stream vald.Insert_StreamInsertServer) (err error) }) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamInsert gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamInsertRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -1699,7 +1700,7 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ vl := len(vector) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("MultiInsert API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.MultiInsertRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(reqs), @@ -1728,13 +1729,13 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ err = errors.ErrMetaDataAlreadyExists(uuid) } st, msg, err := status.ParseError(err, codes.AlreadyExists, - fmt.Sprintf("error MultiInsert API ID = %v already exists", uuid), + "error "+vald.MultiInsertRPCName+" API ID = "+uuid+" already exists", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Exists", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiInsertRPCName + "." + vald.ExistsRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -1770,7 +1771,7 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ emu := new(sync.Mutex) var errs error err = s.gateway.DoMulti(ctx, s.replica, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { - ctx, span := trace.StartSpan(ctx, apiName+".MultiInsert/"+target) + ctx, span := trace.StartSpan(ctx, apiName+"."+vald.MultiInsertRPCName+"/"+target) defer func() { if span != nil { span.End() @@ -1785,7 +1786,7 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ span.RecordError(err) span.SetAttributes(trace.StatusCodeCancelled( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.MultiInsert.DoMulti/" + + "/vald.v1." + vald.MultiInsertRPCName + ".DoMulti/" + target + " canceled: " + err.Error())...) span.SetStatus(trace.StatusError, err.Error()) } @@ -1796,20 +1797,20 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ span.RecordError(err) span.SetAttributes(trace.StatusCodeDeadlineExceeded( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.MultiInsert.DoMulti/" + + "/vald.v1." + vald.MultiInsertRPCName + ".DoMulti/" + target + " deadline_exceeded: " + err.Error())...) span.SetStatus(trace.StatusError, err.Error()) } return nil } st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse MultiInsert gRPC error response", + "failed to parse "+vald.MultiInsertRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(ids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiInsert", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiInsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }) if span != nil { @@ -1844,13 +1845,13 @@ func (s *server) MultiInsert(ctx context.Context, reqs *payload.Insert_MultiRequ if errs != nil { st, msg, err := status.ParseError(errs, codes.Internal, - "failed to parse MultiInsert gRPC error response", + "failed to parse "+vald.MultiInsertRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(ids, ", "), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiInsert.DoMulti", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiInsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -1873,7 +1874,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * uuid := req.GetVector().GetId() if len(uuid) == 0 { err = errors.ErrInvalidMetaDataConfig - err = status.WrapWithInvalidArgument("Update API invalid uuid", err, + err = status.WrapWithInvalidArgument(vald.UpdateRPCName+" API invalid uuid", err, &errdetails.RequestInfo{ ServingData: errdetails.Serialize(req), }, @@ -1886,7 +1887,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * }, }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Update", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), }) if span != nil { @@ -1900,7 +1901,7 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * vl := len(vec) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("Update API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.UpdateRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), @@ -1932,13 +1933,13 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * err = errors.ErrObjectIDNotFound(uuid) } st, msg, err := status.ParseError(err, codes.NotFound, - fmt.Sprintf("error Update API ID = %v not fount", uuid), + "error "+vald.UpdateRPCName+" API ID = "+uuid+" not found", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Update.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + "." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -1953,13 +1954,13 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * err = errors.ErrSameVectorAlreadyExists(uuid, vec.GetVector(), req.GetVector().GetVector()) } st, msg, err := status.ParseError(err, codes.AlreadyExists, - fmt.Sprintf("error Update API ID = %v's same vector data already exists", uuid), + "error "+vald.UpdateRPCName+" API ID = "+uuid+"'s same vector data already exists", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + "." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -1994,13 +1995,13 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * res, err = s.Remove(ctx, rreq) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse Remove for Update gRPC error response", + "failed to parse "+vald.RemoveRPCName+" for "+vald.UpdateRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(rreq), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Update.Remove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + "." + vald.RemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2022,13 +2023,13 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res * res, err = s.Insert(ctx, ireq) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse Insert for Update gRPC error response", + "failed to parse "+vald.InsertRPCName+" for "+vald.UpdateRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(ireq), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Update.Insert", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpdateRPCName + "." + vald.InsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2053,7 +2054,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) func() interface{} { return new(payload.Update_Request) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Update_Request) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamUpdate/id-"+req.GetVector().GetId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamUpdateRPCName+"/id-"+req.GetVector().GetId()) defer func() { if sspan != nil { sspan.End() @@ -2061,7 +2062,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }() res, err := s.Update(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Update gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpdateRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -2081,7 +2082,7 @@ func (s *server) StreamUpdate(stream vald.Update_StreamUpdateServer) (err error) }) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamUpdate gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamUpdateRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -2108,7 +2109,7 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ vl := len(req.GetVector().GetVector()) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("MultiUpdate API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.MultiUpdateRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: req.GetVector().GetId(), ServingData: errdetails.Serialize(reqs), @@ -2140,13 +2141,13 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ err = errors.ErrObjectIDNotFound(uuid) } st, msg, err := status.ParseError(err, codes.NotFound, - fmt.Sprintf("error MultiUpdate API ID = %v not fount", uuid), + "error "+vald.MultiUpdateRPCName+" API ID = "+uuid+" not fount", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiUpdate.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiUpdateRPCName + "." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2197,13 +2198,13 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ }) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse MultiRemove for MultiUpdate gRPC error response", + "failed to parse "+vald.MultiRemoveRPCName+" for "+vald.MultiUpdateRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(ids, ","), ServingData: errdetails.Serialize(rreqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiUpdate.MultiRemove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiUpdateRPCName + "." + vald.MultiRemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2213,19 +2214,19 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ } return nil, err } - log.Debugf("uuids %v were removed from %v due to MultiUpdate. MultiInsert will be executed for them soon. Please see detail %#v", ids, locs.GetLocations(), locs) + log.Debugf("uuids %v were removed from %v due to MultiUpdate. "+vald.MultiInsertRPCName+" will be executed for them soon. Please see detail %#v", ids, locs.GetLocations(), locs) locs, err = s.MultiInsert(ctx, &payload.Insert_MultiRequest{ Requests: ireqs, }) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse MultiInsert for MultiUpdate gRPC error response", + "failed to parse "+vald.MultiInsertRPCName+" for "+vald.MultiUpdateRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(ids, ","), ServingData: errdetails.Serialize(ireqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiUpdate.MultiInsert", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiUpdateRPCName + "." + vald.MultiInsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2250,7 +2251,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * uuid := vec.GetId() if len(uuid) == 0 { err = errors.ErrInvalidMetaDataConfig - err = status.WrapWithInvalidArgument("Upsert API invalid uuid", err, + err = status.WrapWithInvalidArgument(vald.UpsertRPCName+" API invalid uuid", err, &errdetails.RequestInfo{ ServingData: errdetails.Serialize(req), }, @@ -2263,7 +2264,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * }, }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Upsert", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip), }) if span != nil { @@ -2277,7 +2278,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * vl := len(vec.GetVector()) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("Upsert API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.UpsertRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), @@ -2311,13 +2312,13 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * err = errors.ErrSameVectorAlreadyExists(uuid, vec.GetVector(), req.GetVector().GetVector()) } st, msg, err := status.ParseError(err, codes.AlreadyExists, - fmt.Sprintf("error Update for Upsert API ID = %v's same vector data already exists", uuid), + "error "+vald.UpdateRPCName+" for "+vald.UpsertRPCName+" API ID = "+uuid+"'s same vector data already exists", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpsertRPCName + "." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2336,7 +2337,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * var operation string if shouldInsert { - operation = "Insert" + operation = vald.InsertRPCName loc, err = s.Insert(ctx, &payload.Insert_Request{ Vector: vec, Config: &payload.Insert_Config{ @@ -2346,7 +2347,7 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * }, }) } else { - operation = "Update" + operation = vald.UpdateRPCName loc, err = s.Update(ctx, &payload.Update_Request{ Vector: vec, Config: &payload.Update_Config{ @@ -2359,13 +2360,13 @@ func (s *server) Upsert(ctx context.Context, req *payload.Upsert_Request) (loc * if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse "+operation+" for Upsert gRPC error response", + "failed to parse "+operation+" for "+vald.UpsertRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Upsert." + operation, + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.UpsertRPCName + "." + operation, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2389,7 +2390,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) func() interface{} { return new(payload.Upsert_Request) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Upsert_Request) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamUpsert/id-"+req.GetVector().GetId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamUpsertRPCName+"/id-"+req.GetVector().GetId()) defer func() { if sspan != nil { sspan.End() @@ -2397,7 +2398,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }() res, err := s.Upsert(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Upsert gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.UpsertRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -2417,7 +2418,7 @@ func (s *server) StreamUpsert(stream vald.Upsert_StreamUpsertServer) (err error) }) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamUpdate gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamUpsertRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -2446,7 +2447,7 @@ func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequ vl := len(vec.GetVector()) if vl < algorithm.MinimumVectorDimensionSize { err = errors.ErrInvalidDimensionSize(vl, 0) - err = status.WrapWithInvalidArgument("MultiUpsert API invalid vector argument", err, + err = status.WrapWithInvalidArgument(vald.MultiUpsertRPCName+" API invalid vector argument", err, &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), @@ -2580,13 +2581,13 @@ func (s *server) MultiUpsert(ctx context.Context, reqs *payload.Upsert_MultiRequ if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse MultiUpsert gRPC error response", + "failed to parse "+vald.MultiUpsertRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(ids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiUpsert", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiUpsertRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2615,13 +2616,13 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs err = errors.ErrObjectIDNotFound(id.GetId()) } st, msg, err := status.ParseError(err, codes.NotFound, - fmt.Sprintf("error Remove API ID = %v not found", id.GetId()), + "error "+vald.RemoveRPCName+" API ID = "+id.GetId()+" not found", &errdetails.RequestInfo{ RequestId: id.GetId(), ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Exists", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName + "." + vald.ExistsRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2653,7 +2654,7 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs Ips: make([]string, 0, s.replica), } err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) (err error) { - ctx, span := trace.StartSpan(ctx, apiName+".Remove/"+target) + ctx, span := trace.StartSpan(ctx, apiName+"."+vald.RemoveRPCName+"/"+target) defer func() { if span != nil { span.End() @@ -2662,13 +2663,13 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs loc, err := vc.Remove(ctx, req, copts...) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse Remove gRPC error response", + "failed to parse "+vald.RemoveRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: id.GetId(), ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Remove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }) if span != nil { @@ -2690,13 +2691,13 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs }) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse Remove gRPC error response", + "failed to parse "+vald.RemoveRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: id.GetId(), ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Remove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2708,13 +2709,13 @@ func (s *server) Remove(ctx context.Context, req *payload.Remove_Request) (locs } if len(locs.Ips) <= 0 { err = errors.ErrIndexNotFound - err = status.WrapWithNotFound("Remove API remove target not found", err, + err = status.WrapWithNotFound(vald.RemoveRPCName+" API remove target not found", err, &errdetails.RequestInfo{ RequestId: id.GetId(), ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Remove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.RemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -2738,7 +2739,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) func() interface{} { return new(payload.Remove_Request) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Remove_Request) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamRemove/id-"+req.GetId().GetId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamRemoveRPCName+"/id-"+req.GetId().GetId()) defer func() { if sspan != nil { sspan.End() @@ -2746,7 +2747,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }() res, err := s.Remove(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse Remove gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.RemoveRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -2766,7 +2767,7 @@ func (s *server) StreamRemove(stream vald.Remove_StreamRemoveServer) (err error) }) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamRemove gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamRemoveRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -2797,13 +2798,13 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ err = errors.ErrObjectIDNotFound(id.GetId()) } st, msg, err := status.ParseError(err, codes.NotFound, - fmt.Sprintf("MultiRemove API ID = %v not found", id.GetId()), + fmt.Sprintf(vald.MultiRemoveRPCName+" API ID = %v not found", id.GetId()), &errdetails.RequestInfo{ RequestId: id.GetId(), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.Exists", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiRemoveRPCName + "." + vald.ExistsRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2835,7 +2836,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ Locations: make([]*payload.Object_Location, 0, len(reqs.GetRequests())), } err = s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { - ctx, span := trace.StartSpan(ctx, apiName+".MultiRemove/"+target) + ctx, span := trace.StartSpan(ctx, apiName+"."+vald.MultiRemoveRPCName+"/"+target) defer func() { if span != nil { span.End() @@ -2850,7 +2851,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ span.RecordError(err) span.SetAttributes(trace.StatusCodeCancelled( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.MultiRemove.BroadCast/" + + "/vald.v1." + vald.MultiRemoveRPCName + ".BroadCast/" + target + " canceled: " + err.Error())...) span.SetStatus(trace.StatusError, err.Error()) } @@ -2861,7 +2862,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ span.RecordError(err) span.SetAttributes(trace.StatusCodeDeadlineExceeded( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.MultiRemove.BroadCast/" + + "/vald.v1." + vald.MultiRemoveRPCName + ".BroadCast/" + target + " deadline_exceeded: " + err.Error())...) span.SetStatus(trace.StatusError, err.Error()) } @@ -2874,7 +2875,7 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiRemove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiRemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }) if span != nil { @@ -2896,13 +2897,13 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ }) if err != nil { st, msg, err := status.ParseError(err, codes.Internal, - "failed to parse MultiRemove gRPC error response", + "failed to parse "+vald.MultiRemoveRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: strings.Join(ids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiRemove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiRemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -2914,13 +2915,13 @@ func (s *server) MultiRemove(ctx context.Context, reqs *payload.Remove_MultiRequ } if len(locs.Locations) <= 0 { err = errors.ErrIndexNotFound - err = status.WrapWithNotFound("MultiRemove API remove target not found", err, + err = status.WrapWithNotFound(vald.MultiRemoveRPCName+" API remove target not found", err, &errdetails.RequestInfo{ RequestId: strings.Join(ids, ","), ServingData: errdetails.Serialize(reqs), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.MultiRemove", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.MultiRemoveRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }) if span != nil { @@ -2949,7 +2950,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques ctx, cancel = context.WithCancel(ctx) var once sync.Once ech <- s.gateway.BroadCast(ctx, func(ctx context.Context, target string, vc vald.Client, copts ...grpc.CallOption) error { - sctx, sspan := trace.StartSpan(ctx, apiName+".GetObject/"+target) + sctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.GetObjectRPCName+"/"+target) defer func() { if span != nil { sspan.End() @@ -2964,7 +2965,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques sspan.RecordError(err) sspan.SetAttributes(trace.StatusCodeCancelled( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.GetObject.BroadCast/" + + "/vald.v1." + vald.GetObjectRPCName + ".BroadCast/" + target + " canceled: " + err.Error())...) sspan.SetStatus(trace.StatusError, err.Error()) } @@ -2975,7 +2976,7 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques sspan.RecordError(err) sspan.SetAttributes(trace.StatusCodeDeadlineExceeded( errdetails.ValdGRPCResourceTypePrefix + - "/vald.v1.GetObject.BroadCast/" + + "/vald.v1." + vald.GetObjectRPCName + ".BroadCast/" + target + " deadline_exceeded: " + err.Error())...) sspan.SetStatus(trace.StatusError, err.Error()) } @@ -2983,13 +2984,13 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques } uuid := req.GetId().GetId() st, msg, err := status.ParseError(err, codes.NotFound, - fmt.Sprintf("GetObject API ID = %s not found", uuid), + vald.GetObjectRPCName+" API ID = "+uuid+"'s not found", &errdetails.RequestInfo{ RequestId: uuid, ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %s", apiName, s.name, s.ip, target), }, info.Get()) if span != nil { @@ -3021,13 +3022,13 @@ func (s *server) GetObject(ctx context.Context, req *payload.Object_VectorReques if err != nil || vec == nil || vec.GetId() == "" || vec.GetVector() == nil { err = errors.ErrObjectNotFound(err, req.GetId().GetId()) st, msg, err := status.ParseError(err, codes.NotFound, - "failed to parse GetObject gRPC error response", + "failed to parse "+vald.GetObjectRPCName+" gRPC error response", &errdetails.RequestInfo{ RequestId: req.GetId().GetId(), ServingData: errdetails.Serialize(req), }, &errdetails.ResourceInfo{ - ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1.GetObject", + ResourceType: errdetails.ValdGRPCResourceTypePrefix + "/vald.v1." + vald.GetObjectRPCName, ResourceName: fmt.Sprintf("%s: %s(%s) to %v", apiName, s.name, s.ip, s.gateway.Addrs(ctx)), }, info.Get()) if span != nil { @@ -3051,7 +3052,7 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err func() interface{} { return new(payload.Object_VectorRequest) }, func(ctx context.Context, data interface{}) (interface{}, error) { req := data.(*payload.Object_VectorRequest) - ctx, sspan := trace.StartSpan(ctx, apiName+".StreamGetObject/id-"+req.GetId().GetId()) + ctx, sspan := trace.StartSpan(ctx, apiName+"."+vald.StreamGetObjectRPCName+"/id-"+req.GetId().GetId()) defer func() { if sspan != nil { sspan.End() @@ -3059,7 +3060,7 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }() res, err := s.GetObject(ctx, req) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse GetObject gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.GetObjectRPCName+" gRPC error response") if sspan != nil { sspan.RecordError(err) sspan.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...) @@ -3079,7 +3080,7 @@ func (s *server) StreamGetObject(stream vald.Object_StreamGetObjectServer) (err }) if err != nil { - st, msg, err := status.ParseError(err, codes.Internal, "failed to parse StreamGetObject gRPC error response") + st, msg, err := status.ParseError(err, codes.Internal, "failed to parse "+vald.StreamGetObjectRPCName+" gRPC error response") if span != nil { span.RecordError(err) span.SetAttributes(trace.FromGRPCStatus(st.Code(), msg)...)