From 4093d7144ade705554b349b5c7ef54535ebb15d3 Mon Sep 17 00:00:00 2001 From: HighPon Date: Sun, 20 Oct 2024 06:51:10 +0000 Subject: [PATCH] fix: Replace agent Remove RPC to Vald Remove RPC --- apis/grpc/v1/agent/core/agent.go | 1 - apis/grpc/v1/agent/core/agent_vtproto.pb.go | 45 ------------------- apis/grpc/v1/vald/remove_vtproto.pb.go | 2 +- .../client/v1/client/agent/core/client.go | 29 ------------ pkg/index/job/deletion/service/deleter.go | 45 ++++++++++--------- pkg/index/job/deletion/service/options.go | 8 ++++ pkg/index/job/deletion/usecase/deletion.go | 1 + 7 files changed, 35 insertions(+), 96 deletions(-) diff --git a/apis/grpc/v1/agent/core/agent.go b/apis/grpc/v1/agent/core/agent.go index fabd1570885..caf77ef2a27 100644 --- a/apis/grpc/v1/agent/core/agent.go +++ b/apis/grpc/v1/agent/core/agent.go @@ -19,7 +19,6 @@ package core const ( CreateIndexRPCName = "CreateIndex" - DeleteIndexRPCName = "DeleteIndex" SaveIndexRPCName = "SaveIndex" CreateAndSaveIndexRPCName = "CreateAndSaveIndex" ) diff --git a/apis/grpc/v1/agent/core/agent_vtproto.pb.go b/apis/grpc/v1/agent/core/agent_vtproto.pb.go index e94800ab1e8..93f689fa9c5 100644 --- a/apis/grpc/v1/agent/core/agent_vtproto.pb.go +++ b/apis/grpc/v1/agent/core/agent_vtproto.pb.go @@ -44,8 +44,6 @@ const _ = grpc.SupportPackageIsVersion7 type AgentClient interface { // Represent the creating index RPC. CreateIndex(ctx context.Context, in *payload.Control_CreateIndexRequest, opts ...grpc.CallOption) (*payload.Empty, error) - // Represent the deleting index RPC. - DeleteIndex(ctx context.Context, in *payload.Remove_Request, opts ...grpc.CallOption) (*payload.Empty, error) // Represent the saving index RPC. SaveIndex(ctx context.Context, in *payload.Empty, opts ...grpc.CallOption) (*payload.Empty, error) // Represent the creating and saving index RPC. @@ -71,17 +69,6 @@ func (c *agentClient) CreateIndex( return out, nil } -func (c *agentClient) DeleteIndex( - ctx context.Context, in *payload.Remove_Request, opts ...grpc.CallOption, -) (*payload.Empty, error) { - out := new(payload.Empty) - err := c.cc.Invoke(ctx, "/core.v1.Agent/DeleteIndex", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *agentClient) SaveIndex( ctx context.Context, in *payload.Empty, opts ...grpc.CallOption, ) (*payload.Empty, error) { @@ -110,8 +97,6 @@ func (c *agentClient) CreateAndSaveIndex( type AgentServer interface { // Represent the creating index RPC. CreateIndex(context.Context, *payload.Control_CreateIndexRequest) (*payload.Empty, error) - // Represent the deleting index RPC. - DeleteIndex(context.Context, *payload.Remove_Request) (*payload.Empty, error) // Represent the saving index RPC. SaveIndex(context.Context, *payload.Empty) (*payload.Empty, error) // Represent the creating and saving index RPC. @@ -128,12 +113,6 @@ func (UnimplementedAgentServer) CreateIndex( return nil, status.Errorf(codes.Unimplemented, "method CreateIndex not implemented") } -func (UnimplementedAgentServer) DeleteIndex( - context.Context, *payload.Remove_Request, -) (*payload.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method DeleteIndex not implemented") -} - func (UnimplementedAgentServer) SaveIndex(context.Context, *payload.Empty) (*payload.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method SaveIndex not implemented") } @@ -176,26 +155,6 @@ func _Agent_CreateIndex_Handler( return interceptor(ctx, in, info, handler) } -func _Agent_DeleteIndex_Handler( - srv any, ctx context.Context, dec func(any) error, interceptor grpc.UnaryServerInterceptor, -) (any, error) { - in := new(payload.Remove_Request) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(AgentServer).DeleteIndex(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/core.v1.Agent/DeleteIndex", - } - handler := func(ctx context.Context, req any) (any, error) { - return srv.(AgentServer).DeleteIndex(ctx, req.(*payload.Remove_Request)) - } - return interceptor(ctx, in, info, handler) -} - func _Agent_SaveIndex_Handler( srv any, ctx context.Context, dec func(any) error, interceptor grpc.UnaryServerInterceptor, ) (any, error) { @@ -247,10 +206,6 @@ var Agent_ServiceDesc = grpc.ServiceDesc{ MethodName: "CreateIndex", Handler: _Agent_CreateIndex_Handler, }, - { - MethodName: "DeleteIndex", - Handler: _Agent_DeleteIndex_Handler, - }, { MethodName: "SaveIndex", Handler: _Agent_SaveIndex_Handler, diff --git a/apis/grpc/v1/vald/remove_vtproto.pb.go b/apis/grpc/v1/vald/remove_vtproto.pb.go index 254c43f43bd..68219d9faa2 100644 --- a/apis/grpc/v1/vald/remove_vtproto.pb.go +++ b/apis/grpc/v1/vald/remove_vtproto.pb.go @@ -40,7 +40,7 @@ const _ = grpc.SupportPackageIsVersion7 // RemoveClient is the client API for Remove service. // -// Foaaar semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type RemoveClient interface { // A method to remove an indexed vector. Remove(ctx context.Context, in *payload.Remove_Request, opts ...grpc.CallOption) (*payload.Object_Location, error) diff --git a/internal/client/v1/client/agent/core/client.go b/internal/client/v1/client/agent/core/client.go index 07f4ed820f3..10874ac9b27 100644 --- a/internal/client/v1/client/agent/core/client.go +++ b/internal/client/v1/client/agent/core/client.go @@ -110,23 +110,6 @@ func (c *agentClient) CreateIndex( return nil, err } -func (c *agentClient) DeleteIndex( - ctx context.Context, req *client.RemoveRequest, _ ...grpc.CallOption, -) (*client.Empty, error) { - ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/client/"+agent.DeleteIndexRPCName), apiName+"/"+agent.DeleteIndexRPCName) - defer func() { - if span != nil { - span.End() - } - }() - _, err := c.c.RoundRobin(ctx, func(ctx context.Context, - conn *grpc.ClientConn, copts ...grpc.CallOption, - ) (any, error) { - return agent.NewAgentClient(conn).DeleteIndex(ctx, req, copts...) - }) - return nil, err -} - func (c *agentClient) SaveIndex( ctx context.Context, _ *client.Empty, _ ...grpc.CallOption, ) (*client.Empty, error) { @@ -173,18 +156,6 @@ func (c *singleAgentClient) CreateIndex( return c.ac.CreateIndex(ctx, req, opts...) } -func (c *singleAgentClient) DeleteIndex( - ctx context.Context, req *client.RemoveRequest, opts ...grpc.CallOption, -) (*client.Empty, error) { - ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/singleClient/"+agent.DeleteIndexRPCName), apiName+"/"+agent.DeleteIndexRPCName) - defer func() { - if span != nil { - span.End() - } - }() - return c.ac.DeleteIndex(ctx, req, opts...) -} - func (c *singleAgentClient) SaveIndex( ctx context.Context, _ *client.Empty, opts ...grpc.CallOption, ) (*client.Empty, error) { diff --git a/pkg/index/job/deletion/service/deleter.go b/pkg/index/job/deletion/service/deleter.go index ffb37d87500..373817a7c7b 100644 --- a/pkg/index/job/deletion/service/deleter.go +++ b/pkg/index/job/deletion/service/deleter.go @@ -6,8 +6,8 @@ import ( "strings" "sync" - agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/apis/grpc/v1/vald" "github.com/vdaas/vald/internal/client/v1/client/discoverer" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" @@ -19,7 +19,7 @@ import ( const ( apiName = "vald/index/job/delete" - grpcMethodName = "core.v1.Agent/" + agent.DeleteIndexRPCName + grpcMethodName = "vald.v1.Remove/" + vald.RemoveRPCName ) // Deleter represents an interface for deleting. @@ -33,8 +33,9 @@ var defaultOpts = []Option{ } type index struct { - client discoverer.Client - targetAddrs []string + client discoverer.Client + targetAddrs []string + targetIndexID string concurrency int } @@ -84,10 +85,10 @@ func (idx *index) Start(ctx context.Context) error { }() err := idx.doDeleteIndex(ctx, - func(ctx context.Context, ac agent.AgentClient, copts ...grpc.CallOption) (*payload.Empty, error) { - return ac.DeleteIndex(ctx, &payload.Remove_Request{ + func(ctx context.Context, rc vald.RemoveClient, copts ...grpc.CallOption) (*payload.Object_Location, error) { + return rc.Remove(ctx, &payload.Remove_Request{ Id: &payload.Object_ID{ - Id: "hoge", + Id: idx.targetIndexID, }, }, copts...) }, @@ -97,12 +98,12 @@ func (idx *index) Start(ctx context.Context) error { switch { case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): err = status.WrapWithInternal( - agent.DeleteIndexRPCName+" API connection not found", err, + vald.RemoveRPCName+" API connection not found", err, ) attrs = trace.StatusCodeInternal(err.Error()) case errors.Is(err, errors.ErrGRPCTargetAddrNotFound): err = status.WrapWithInternal( - agent.DeleteIndexRPCName+" API connection target address \""+strings.Join(idx.targetAddrs, ",")+"\" not found", err, + vald.RemoveRPCName+" API connection target address \""+strings.Join(idx.targetAddrs, ",")+"\" not found", err, ) attrs = trace.StatusCodeInternal(err.Error()) default: @@ -111,7 +112,7 @@ func (idx *index) Start(ctx context.Context) error { msg string ) st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+agent.DeleteIndexRPCName+" gRPC error response", + "failed to parse "+vald.RemoveRPCName+" gRPC error response", ) attrs = trace.FromGRPCStatus(st.Code(), msg) } @@ -128,7 +129,7 @@ func (idx *index) Start(ctx context.Context) error { func (idx *index) doDeleteIndex( ctx context.Context, - fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error), + fn func(_ context.Context, _ vald.RemoveClient, _ ...grpc.CallOption) (*payload.Object_Location, error), ) (errs error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doDeleteIndex") defer func() { @@ -153,34 +154,34 @@ func (idx *index) doDeleteIndex( var emu sync.Mutex err := idx.client.GetClient().OrderedRangeConcurrent(ctx, targetAddrs, idx.concurrency, func(ctx context.Context, target string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { - ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "OrderedRangeConcurrent/"+target), agent.DeleteIndexRPCName+"/"+target) + ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "OrderedRangeConcurrent/"+target), vald.RemoveRPCName+"/"+target) defer func() { if span != nil { span.End() } }() - _, err := fn(ctx, agent.NewAgentClient(conn), copts...) + _, err := fn(ctx, vald.NewRemoveClient(conn), copts...) if err != nil { var attrs trace.Attributes switch { case errors.Is(err, context.Canceled): err = status.WrapWithCanceled( - agent.DeleteIndexRPCName+" API canceld", err, + vald.RemoveRPCName+" API canceld", err, ) attrs = trace.StatusCodeCancelled(err.Error()) case errors.Is(err, context.DeadlineExceeded): err = status.WrapWithCanceled( - agent.DeleteIndexRPCName+" API deadline exceeded", err, + vald.RemoveRPCName+" API deadline exceeded", err, ) attrs = trace.StatusCodeDeadlineExceeded(err.Error()) case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): err = status.WrapWithInternal( - agent.DeleteIndexRPCName+" API connection not found", err, + vald.RemoveRPCName+" API connection not found", err, ) attrs = trace.StatusCodeInternal(err.Error()) case errors.Is(err, errors.ErrTargetNotFound): err = status.WrapWithInvalidArgument( - agent.DeleteIndexRPCName+" API target not found", err, + vald.RemoveRPCName+" API target not found", err, ) attrs = trace.StatusCodeInternal(err.Error()) default: @@ -189,15 +190,19 @@ func (idx *index) doDeleteIndex( msg string ) st, msg, err = status.ParseError(err, codes.Internal, - "failed to parse "+agent.DeleteIndexRPCName+" gRPC error response", + "failed to parse "+vald.RemoveRPCName+" gRPC error response", ) if st != nil && err != nil && st.Code() == codes.FailedPrecondition { - log.Warnf("DeleteIndex of %s skipped, message: %s, err: %v", target, st.Message(), errors.Join(st.Err(), err)) + log.Warnf("DeleteIndex of %s skipped, indexID:%s, message: %s, err: %v", target, idx.targetIndexID, st.Message(), errors.Join(st.Err(), err)) + return nil + } + if st != nil && err != nil && st.Code() == codes.NotFound { + log.Warn("DeleteIndex of %s skipped, indexID: %s, message: %s, err: %v", target, idx.targetIndexID, st.Message(), errors.Join(st.Err(), err)) return nil } attrs = trace.FromGRPCStatus(st.Code(), msg) } - log.Warnf("an error occurred in (%s) during indexing: %v", target, err) + log.Warnf("an error occurred in (%s) deleting index(%s): %v", target, idx.targetIndexID, err) if span != nil { span.RecordError(err) span.SetAttributes(attrs...) diff --git a/pkg/index/job/deletion/service/options.go b/pkg/index/job/deletion/service/options.go index 6bd1eef6ab4..ccc6ecffed2 100644 --- a/pkg/index/job/deletion/service/options.go +++ b/pkg/index/job/deletion/service/options.go @@ -38,3 +38,11 @@ func WithTargetAddrs(addrs ...string) Option { return nil } } + +// WithTargetIndexID returns Option that sets target deleting index ID. +func WithTargetIndexID(indexID string) Option { + return func(idx *index) error { + idx.targetIndexID = indexID + return nil + } +} diff --git a/pkg/index/job/deletion/usecase/deletion.go b/pkg/index/job/deletion/usecase/deletion.go index e404bf5888f..ace04d5e474 100644 --- a/pkg/index/job/deletion/usecase/deletion.go +++ b/pkg/index/job/deletion/usecase/deletion.go @@ -86,6 +86,7 @@ func New(cfg *config.Data) (_ runner.Runner, err error) { service.WithDiscoverer(discoverer), service.WithIndexingConcurrency(cfg.Deletion.Concurrency), service.WithTargetAddrs(cfg.Deletion.TargetAddrs...), + service.WithTargetIndexID(cfg.Deletion.IndexID), ) if err != nil { return nil, err