Skip to content

Commit

Permalink
fix: Replace agent Remove RPC to Vald Remove RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
highpon committed Oct 20, 2024
1 parent ff65493 commit 4093d71
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 96 deletions.
1 change: 0 additions & 1 deletion apis/grpc/v1/agent/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package core

const (
CreateIndexRPCName = "CreateIndex"
DeleteIndexRPCName = "DeleteIndex"
SaveIndexRPCName = "SaveIndex"
CreateAndSaveIndexRPCName = "CreateAndSaveIndex"
)
45 changes: 0 additions & 45 deletions apis/grpc/v1/agent/core/agent_vtproto.pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ const _ = grpc.SupportPackageIsVersion7
type AgentClient interface {
// Represent the creating index RPC.
CreateIndex(ctx context.Context, in *payload.Control_CreateIndexRequest, opts ...grpc.CallOption) (*payload.Empty, error)
// Represent the deleting index RPC.
DeleteIndex(ctx context.Context, in *payload.Remove_Request, opts ...grpc.CallOption) (*payload.Empty, error)
// Represent the saving index RPC.
SaveIndex(ctx context.Context, in *payload.Empty, opts ...grpc.CallOption) (*payload.Empty, error)
// Represent the creating and saving index RPC.
Expand All @@ -71,17 +69,6 @@ func (c *agentClient) CreateIndex(
return out, nil
}

func (c *agentClient) DeleteIndex(
ctx context.Context, in *payload.Remove_Request, opts ...grpc.CallOption,
) (*payload.Empty, error) {
out := new(payload.Empty)
err := c.cc.Invoke(ctx, "/core.v1.Agent/DeleteIndex", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

func (c *agentClient) SaveIndex(
ctx context.Context, in *payload.Empty, opts ...grpc.CallOption,
) (*payload.Empty, error) {
Expand Down Expand Up @@ -110,8 +97,6 @@ func (c *agentClient) CreateAndSaveIndex(
type AgentServer interface {
// Represent the creating index RPC.
CreateIndex(context.Context, *payload.Control_CreateIndexRequest) (*payload.Empty, error)
// Represent the deleting index RPC.
DeleteIndex(context.Context, *payload.Remove_Request) (*payload.Empty, error)
// Represent the saving index RPC.
SaveIndex(context.Context, *payload.Empty) (*payload.Empty, error)
// Represent the creating and saving index RPC.
Expand All @@ -128,12 +113,6 @@ func (UnimplementedAgentServer) CreateIndex(
return nil, status.Errorf(codes.Unimplemented, "method CreateIndex not implemented")
}

func (UnimplementedAgentServer) DeleteIndex(
context.Context, *payload.Remove_Request,
) (*payload.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteIndex not implemented")
}

func (UnimplementedAgentServer) SaveIndex(context.Context, *payload.Empty) (*payload.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method SaveIndex not implemented")
}
Expand Down Expand Up @@ -176,26 +155,6 @@ func _Agent_CreateIndex_Handler(
return interceptor(ctx, in, info, handler)
}

func _Agent_DeleteIndex_Handler(
srv any, ctx context.Context, dec func(any) error, interceptor grpc.UnaryServerInterceptor,
) (any, error) {
in := new(payload.Remove_Request)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AgentServer).DeleteIndex(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/core.v1.Agent/DeleteIndex",
}
handler := func(ctx context.Context, req any) (any, error) {
return srv.(AgentServer).DeleteIndex(ctx, req.(*payload.Remove_Request))
}
return interceptor(ctx, in, info, handler)
}

func _Agent_SaveIndex_Handler(
srv any, ctx context.Context, dec func(any) error, interceptor grpc.UnaryServerInterceptor,
) (any, error) {
Expand Down Expand Up @@ -247,10 +206,6 @@ var Agent_ServiceDesc = grpc.ServiceDesc{
MethodName: "CreateIndex",
Handler: _Agent_CreateIndex_Handler,
},
{
MethodName: "DeleteIndex",
Handler: _Agent_DeleteIndex_Handler,
},
{
MethodName: "SaveIndex",
Handler: _Agent_SaveIndex_Handler,
Expand Down
2 changes: 1 addition & 1 deletion apis/grpc/v1/vald/remove_vtproto.pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const _ = grpc.SupportPackageIsVersion7

// RemoveClient is the client API for Remove service.
//
// Foaaar semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type RemoveClient interface {
// A method to remove an indexed vector.
Remove(ctx context.Context, in *payload.Remove_Request, opts ...grpc.CallOption) (*payload.Object_Location, error)
Expand Down
29 changes: 0 additions & 29 deletions internal/client/v1/client/agent/core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,6 @@ func (c *agentClient) CreateIndex(
return nil, err
}

func (c *agentClient) DeleteIndex(
ctx context.Context, req *client.RemoveRequest, _ ...grpc.CallOption,
) (*client.Empty, error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/client/"+agent.DeleteIndexRPCName), apiName+"/"+agent.DeleteIndexRPCName)
defer func() {
if span != nil {
span.End()
}
}()
_, err := c.c.RoundRobin(ctx, func(ctx context.Context,
conn *grpc.ClientConn, copts ...grpc.CallOption,
) (any, error) {
return agent.NewAgentClient(conn).DeleteIndex(ctx, req, copts...)
})
return nil, err
}

func (c *agentClient) SaveIndex(
ctx context.Context, _ *client.Empty, _ ...grpc.CallOption,
) (*client.Empty, error) {
Expand Down Expand Up @@ -173,18 +156,6 @@ func (c *singleAgentClient) CreateIndex(
return c.ac.CreateIndex(ctx, req, opts...)
}

func (c *singleAgentClient) DeleteIndex(
ctx context.Context, req *client.RemoveRequest, opts ...grpc.CallOption,
) (*client.Empty, error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "internal/singleClient/"+agent.DeleteIndexRPCName), apiName+"/"+agent.DeleteIndexRPCName)
defer func() {
if span != nil {
span.End()
}
}()
return c.ac.DeleteIndex(ctx, req, opts...)
}

func (c *singleAgentClient) SaveIndex(
ctx context.Context, _ *client.Empty, opts ...grpc.CallOption,
) (*client.Empty, error) {
Expand Down
45 changes: 25 additions & 20 deletions pkg/index/job/deletion/service/deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strings"
"sync"

agent "github.com/vdaas/vald/apis/grpc/v1/agent/core"
"github.com/vdaas/vald/apis/grpc/v1/payload"
"github.com/vdaas/vald/apis/grpc/v1/vald"
"github.com/vdaas/vald/internal/client/v1/client/discoverer"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
Expand All @@ -19,7 +19,7 @@ import (

const (
apiName = "vald/index/job/delete"
grpcMethodName = "core.v1.Agent/" + agent.DeleteIndexRPCName
grpcMethodName = "vald.v1.Remove/" + vald.RemoveRPCName
)

// Deleter represents an interface for deleting.
Expand All @@ -33,8 +33,9 @@ var defaultOpts = []Option{
}

type index struct {
client discoverer.Client
targetAddrs []string
client discoverer.Client
targetAddrs []string
targetIndexID string

concurrency int
}
Expand Down Expand Up @@ -84,10 +85,10 @@ func (idx *index) Start(ctx context.Context) error {
}()

err := idx.doDeleteIndex(ctx,
func(ctx context.Context, ac agent.AgentClient, copts ...grpc.CallOption) (*payload.Empty, error) {
return ac.DeleteIndex(ctx, &payload.Remove_Request{
func(ctx context.Context, rc vald.RemoveClient, copts ...grpc.CallOption) (*payload.Object_Location, error) {
return rc.Remove(ctx, &payload.Remove_Request{
Id: &payload.Object_ID{
Id: "hoge",
Id: idx.targetIndexID,
},
}, copts...)
},
Expand All @@ -97,12 +98,12 @@ func (idx *index) Start(ctx context.Context) error {
switch {
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
agent.DeleteIndexRPCName+" API connection not found", err,
vald.RemoveRPCName+" API connection not found", err,
)
attrs = trace.StatusCodeInternal(err.Error())
case errors.Is(err, errors.ErrGRPCTargetAddrNotFound):
err = status.WrapWithInternal(
agent.DeleteIndexRPCName+" API connection target address \""+strings.Join(idx.targetAddrs, ",")+"\" not found", err,
vald.RemoveRPCName+" API connection target address \""+strings.Join(idx.targetAddrs, ",")+"\" not found", err,
)
attrs = trace.StatusCodeInternal(err.Error())
default:
Expand All @@ -111,7 +112,7 @@ func (idx *index) Start(ctx context.Context) error {
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
"failed to parse "+agent.DeleteIndexRPCName+" gRPC error response",
"failed to parse "+vald.RemoveRPCName+" gRPC error response",
)
attrs = trace.FromGRPCStatus(st.Code(), msg)
}
Expand All @@ -128,7 +129,7 @@ func (idx *index) Start(ctx context.Context) error {

func (idx *index) doDeleteIndex(
ctx context.Context,
fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error),
fn func(_ context.Context, _ vald.RemoveClient, _ ...grpc.CallOption) (*payload.Object_Location, error),
) (errs error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doDeleteIndex")
defer func() {
Expand All @@ -153,34 +154,34 @@ func (idx *index) doDeleteIndex(
var emu sync.Mutex
err := idx.client.GetClient().OrderedRangeConcurrent(ctx, targetAddrs, idx.concurrency,
func(ctx context.Context, target string, conn *grpc.ClientConn, copts ...grpc.CallOption) error {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "OrderedRangeConcurrent/"+target), agent.DeleteIndexRPCName+"/"+target)
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "OrderedRangeConcurrent/"+target), vald.RemoveRPCName+"/"+target)
defer func() {
if span != nil {
span.End()
}
}()
_, err := fn(ctx, agent.NewAgentClient(conn), copts...)
_, err := fn(ctx, vald.NewRemoveClient(conn), copts...)
if err != nil {
var attrs trace.Attributes
switch {
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(
agent.DeleteIndexRPCName+" API canceld", err,
vald.RemoveRPCName+" API canceld", err,
)
attrs = trace.StatusCodeCancelled(err.Error())
case errors.Is(err, context.DeadlineExceeded):
err = status.WrapWithCanceled(
agent.DeleteIndexRPCName+" API deadline exceeded", err,
vald.RemoveRPCName+" API deadline exceeded", err,
)
attrs = trace.StatusCodeDeadlineExceeded(err.Error())
case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")):
err = status.WrapWithInternal(
agent.DeleteIndexRPCName+" API connection not found", err,
vald.RemoveRPCName+" API connection not found", err,
)
attrs = trace.StatusCodeInternal(err.Error())
case errors.Is(err, errors.ErrTargetNotFound):
err = status.WrapWithInvalidArgument(
agent.DeleteIndexRPCName+" API target not found", err,
vald.RemoveRPCName+" API target not found", err,
)
attrs = trace.StatusCodeInternal(err.Error())
default:
Expand All @@ -189,15 +190,19 @@ func (idx *index) doDeleteIndex(
msg string
)
st, msg, err = status.ParseError(err, codes.Internal,
"failed to parse "+agent.DeleteIndexRPCName+" gRPC error response",
"failed to parse "+vald.RemoveRPCName+" gRPC error response",
)
if st != nil && err != nil && st.Code() == codes.FailedPrecondition {
log.Warnf("DeleteIndex of %s skipped, message: %s, err: %v", target, st.Message(), errors.Join(st.Err(), err))
log.Warnf("DeleteIndex of %s skipped, indexID:%s, message: %s, err: %v", target, idx.targetIndexID, st.Message(), errors.Join(st.Err(), err))
return nil
}
if st != nil && err != nil && st.Code() == codes.NotFound {
log.Warn("DeleteIndex of %s skipped, indexID: %s, message: %s, err: %v", target, idx.targetIndexID, st.Message(), errors.Join(st.Err(), err))
return nil
}
attrs = trace.FromGRPCStatus(st.Code(), msg)
}
log.Warnf("an error occurred in (%s) during indexing: %v", target, err)
log.Warnf("an error occurred in (%s) deleting index(%s): %v", target, idx.targetIndexID, err)
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
Expand Down
8 changes: 8 additions & 0 deletions pkg/index/job/deletion/service/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,11 @@ func WithTargetAddrs(addrs ...string) Option {
return nil
}
}

// WithTargetIndexID returns Option that sets target deleting index ID.
func WithTargetIndexID(indexID string) Option {
return func(idx *index) error {
idx.targetIndexID = indexID
return nil
}
}
1 change: 1 addition & 0 deletions pkg/index/job/deletion/usecase/deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func New(cfg *config.Data) (_ runner.Runner, err error) {
service.WithDiscoverer(discoverer),
service.WithIndexingConcurrency(cfg.Deletion.Concurrency),
service.WithTargetAddrs(cfg.Deletion.TargetAddrs...),
service.WithTargetIndexID(cfg.Deletion.IndexID),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 4093d71

Please sign in to comment.