diff --git a/charts/vald/templates/index/job/creation/configmap.yaml b/charts/vald/templates/index/job/creation/configmap.yaml index b78887e9e6..fbd2850260 100644 --- a/charts/vald/templates/index/job/creation/configmap.yaml +++ b/charts/vald/templates/index/job/creation/configmap.yaml @@ -51,7 +51,12 @@ data: agent_namespace: {{ $creator.agent_namespace | quote }} node_name: {{ $creator.node_name | quote }} concurrency: {{ $creator.concurrency }} - target_addrs: {{ $creator.target_addrs }} + {{- if $creator.target_addrs }} + target_addrs: + {{- toYaml $creator.target_addrs | nindent 8 }} + {{- else }} + target_addrs: [] + {{- end }} discoverer: duration: {{ $creator.discoverer.duration }} client: diff --git a/charts/vald/templates/index/job/save/configmap.yaml b/charts/vald/templates/index/job/save/configmap.yaml index 6813266e86..c1dc8b8d09 100644 --- a/charts/vald/templates/index/job/save/configmap.yaml +++ b/charts/vald/templates/index/job/save/configmap.yaml @@ -51,7 +51,12 @@ data: agent_namespace: {{ $saver.agent_namespace | quote }} node_name: {{ $saver.node_name | quote }} concurrency: {{ $saver.concurrency }} - target_addrs: {{ $saver.target_addrs }} + {{- if $saver.target_addrs }} + target_addrs: + {{- toYaml $saver.target_addrs | nindent 8 }} + {{- else }} + target_addrs: [] + {{- end }} discoverer: duration: {{ $saver.discoverer.duration }} client: diff --git a/pkg/index/job/creation/service/indexer.go b/pkg/index/job/creation/service/indexer.go index 591c0c4074..83ac1eb4e8 100644 --- a/pkg/index/job/creation/service/indexer.go +++ b/pkg/index/job/creation/service/indexer.go @@ -42,9 +42,8 @@ type Indexer interface { } type index struct { - client discoverer.Client - targetAddrs []string - targetAddrList map[string]bool + client discoverer.Client + targetAddrs []string creationPoolSize uint32 concurrency int @@ -64,13 +63,23 @@ func New(opts ...Option) (Indexer, error) { log.Warn(oerr) } } - idx.targetAddrList = make(map[string]bool, len(idx.targetAddrs)) - for _, addr := range idx.targetAddrs { - idx.targetAddrList[addr] = true - } + idx.targetAddrs = delDuplicateAddrs(idx.targetAddrs) return idx, nil } +func delDuplicateAddrs(targetAddrs []string) []string { + addrs := make([]string, 0, len(targetAddrs)) + exist := make(map[string]bool) + + for _, addr := range targetAddrs { + if !exist[addr] { + addrs = append(addrs, addr) + exist[addr] = true + } + } + return addrs +} + // StartClient starts the gRPC client. func (idx *index) StartClient(ctx context.Context) (<-chan error, error) { return idx.client.Start(ctx) @@ -126,6 +135,7 @@ func (idx *index) Start(ctx context.Context) error { return nil } +// skipcq: GO-R1005 func (idx *index) doCreateIndex(ctx context.Context, fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error)) (errs error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doCreateIndex") defer func() { @@ -136,12 +146,14 @@ func (idx *index) doCreateIndex(ctx context.Context, fn func(_ context.Context, targetAddrs := idx.client.GetAddrs(ctx) if len(idx.targetAddrs) != 0 { - targetAddrs = idx.extractTargetAddrs(targetAddrs) - - // If targetAddrs is empty, an invalid target addresses may be registered in targetAddrList. - if len(targetAddrs) == 0 { - return errors.ErrGRPCTargetAddrNotFound + // If target addresses is specified, that addresses are used in priority. + for _, addr := range idx.targetAddrs { + log.Infof("connect to target agent (%s)", addr) + if _, err := idx.client.GetClient().Connect(ctx, addr); err != nil { + return err + } } + targetAddrs = idx.targetAddrs } log.Infof("target agent addrs: %v", targetAddrs) @@ -207,16 +219,3 @@ func (idx *index) doCreateIndex(ctx context.Context, fn func(_ context.Context, ) return errors.Join(err, errs) } - -// extractTargetAddresses filters and extracts target addresses registered in targetAddrList from the given address list. -func (idx *index) extractTargetAddrs(addrs []string) []string { - res := make([]string, 0, len(addrs)) - for _, addr := range addrs { - if !idx.targetAddrList[addr] { - log.Warnf("the gRPC target address not found: %s", addr) - } else { - res = append(res, addr) - } - } - return res -} diff --git a/pkg/index/job/creation/service/indexer_test.go b/pkg/index/job/creation/service/indexer_test.go index 3bca6048fc..aec44fdbc9 100644 --- a/pkg/index/job/creation/service/indexer_test.go +++ b/pkg/index/job/creation/service/indexer_test.go @@ -22,6 +22,7 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/pool" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/test/goleak" clientmock "github.com/vdaas/vald/internal/test/mock/client" @@ -36,7 +37,6 @@ func Test_index_Start(t *testing.T) { type fields struct { client discoverer.Client targetAddrs []string - targetAddrList map[string]bool creationPoolSize uint32 concurrency int } @@ -68,7 +68,6 @@ func Test_index_Start(t *testing.T) { args: args{ ctx: context.Background(), }, - fields: fields{ client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { @@ -88,37 +87,35 @@ func Test_index_Start(t *testing.T) { } }(), func() test { - addrs := []string{ - "127.0.0.1:8080", - } return test{ - name: "Fail: when there is an error wrapped with gRPC status in the indexing request process", + name: "Success: when a target addresses (targetAddrs) is given and there are no errors in the indexing request process", args: args{ ctx: context.Background(), }, fields: fields{ + targetAddrs: []string{ + "127.0.0.1:8080", + "127.0.0.1:8081", + "127.0.0.1:8083", + }, client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { - return addrs + return nil }, GetClientFunc: func() grpc.Client { return &grpcmock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { - return status.WrapWithInternal( - agent.CreateIndexRPCName+" API connection not found", - errors.ErrGRPCClientConnNotFound("*"), - ) + return nil + }, + ConnectFunc: func(_ context.Context, _ string, _ ...grpc.DialOption) (pool.Conn, error) { + return nil, nil }, } }, }, }, - want: want{ - err: status.Error(codes.Internal, - agent.CreateIndexRPCName+" API connection not found"), - }, } }(), func() test { @@ -126,11 +123,10 @@ func Test_index_Start(t *testing.T) { "127.0.0.1:8080", } return test{ - name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error", + name: "Fail: when there is an error wrapped with gRPC status in the indexing request process", args: args{ ctx: context.Background(), }, - fields: fields{ client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { @@ -141,7 +137,10 @@ func Test_index_Start(t *testing.T) { OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { - return errors.ErrGRPCClientConnNotFound("*") + return status.WrapWithInternal( + agent.CreateIndexRPCName+" API connection not found", + errors.ErrGRPCClientConnNotFound("*"), + ) }, } }, @@ -154,29 +153,33 @@ func Test_index_Start(t *testing.T) { } }(), func() test { - targetAddrs := []string{ + addrs := []string{ "127.0.0.1:8080", } - targetAddrList := map[string]bool{ - targetAddrs[0]: true, - } return test{ - name: "Fail: when there is no address matching targetAddrList", + name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error", args: args{ ctx: context.Background(), }, fields: fields{ client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { - return nil + return addrs + }, + GetClientFunc: func() grpc.Client { + return &grpcmock.GRPCClientMock{ + OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, + _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, + ) error { + return errors.ErrGRPCClientConnNotFound("*") + }, + } }, }, - targetAddrs: targetAddrs, - targetAddrList: targetAddrList, }, want: want{ err: status.Error(codes.Internal, - agent.CreateIndexRPCName+" API connection target address \"127.0.0.1:8080\" not found"), + agent.CreateIndexRPCName+" API connection not found"), }, } }(), @@ -200,7 +203,6 @@ func Test_index_Start(t *testing.T) { idx := &index{ client: test.fields.client, targetAddrs: test.fields.targetAddrs, - targetAddrList: test.fields.targetAddrList, creationPoolSize: test.fields.creationPoolSize, concurrency: test.fields.concurrency, } @@ -312,7 +314,6 @@ func Test_index_Start(t *testing.T) { // type fields struct { // client discoverer.Client // targetAddrs []string -// targetAddrList map[string]bool // creationPoolSize uint32 // concurrency int // } @@ -349,7 +350,6 @@ func Test_index_Start(t *testing.T) { // fields: fields { // client:nil, // targetAddrs:nil, -// targetAddrList:nil, // creationPoolSize:0, // concurrency:0, // }, @@ -375,7 +375,6 @@ func Test_index_Start(t *testing.T) { // fields: fields { // client:nil, // targetAddrs:nil, -// targetAddrList:nil, // creationPoolSize:0, // concurrency:0, // }, @@ -410,7 +409,6 @@ func Test_index_Start(t *testing.T) { // idx := &index{ // client: test.fields.client, // targetAddrs: test.fields.targetAddrs, -// targetAddrList: test.fields.targetAddrList, // creationPoolSize: test.fields.creationPoolSize, // concurrency: test.fields.concurrency, // } diff --git a/pkg/index/job/save/service/indexer.go b/pkg/index/job/save/service/indexer.go index 7d8c9e4fbc..a3e8325ba4 100644 --- a/pkg/index/job/save/service/indexer.go +++ b/pkg/index/job/save/service/indexer.go @@ -42,16 +42,15 @@ type Indexer interface { } type index struct { - client discoverer.Client - targetAddrs []string - targetAddrList map[string]bool + client discoverer.Client + targetAddrs []string concurrency int } // New returns Indexer object if no error occurs. func New(opts ...Option) (Indexer, error) { - idx := new(index) + idx := &index{} for _, opt := range append(defaultOpts, opts...) { if err := opt(idx); err != nil { oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) @@ -63,13 +62,23 @@ func New(opts ...Option) (Indexer, error) { log.Warn(oerr) } } - idx.targetAddrList = make(map[string]bool, len(idx.targetAddrs)) - for _, addr := range idx.targetAddrs { - idx.targetAddrList[addr] = true - } + idx.targetAddrs = delDuplicateAddrs(idx.targetAddrs) return idx, nil } +func delDuplicateAddrs(targetAddrs []string) []string { + addrs := make([]string, 0, len(targetAddrs)) + exist := make(map[string]bool) + + for _, addr := range targetAddrs { + if !exist[addr] { + addrs = append(addrs, addr) + exist[addr] = true + } + } + return addrs +} + // StartClient starts the gRPC client. func (idx *index) StartClient(ctx context.Context) (<-chan error, error) { return idx.client.Start(ctx) @@ -123,6 +132,7 @@ func (idx *index) Start(ctx context.Context) error { return nil } +// skipcq: GO-R1005 func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error)) (errs error) { ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doSaveIndex") defer func() { @@ -133,12 +143,13 @@ func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _ targetAddrs := idx.client.GetAddrs(ctx) if len(idx.targetAddrs) != 0 { - targetAddrs = idx.extractTargetAddrs(targetAddrs) - - // If targetAddrs is empty, an invalid target addresses may be registered in targetAddrList. - if len(targetAddrs) == 0 { - return errors.ErrGRPCTargetAddrNotFound + // If target addresses is specified, that addresses are used in priority. + for _, addr := range idx.targetAddrs { + if _, err := idx.client.GetClient().Connect(ctx, addr); err != nil { + return err + } } + targetAddrs = idx.targetAddrs } log.Infof("target agent addrs: %v", targetAddrs) @@ -200,16 +211,3 @@ func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _ ) return errors.Join(err, errs) } - -// extractTargetAddresses filters and extracts target addresses registered in targetAddrList from the given address list. -func (idx *index) extractTargetAddrs(addrs []string) []string { - res := make([]string, 0, len(addrs)) - for _, addr := range addrs { - if !idx.targetAddrList[addr] { - log.Warnf("the gRPC target address not found: %s", addr) - } else { - res = append(res, addr) - } - } - return res -} diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index 67bdd94e76..ed05353bbe 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -22,6 +22,7 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/pool" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/test/goleak" clientmock "github.com/vdaas/vald/internal/test/mock/client" @@ -29,14 +30,14 @@ import ( ) func Test_index_Start(t *testing.T) { + t.Parallel() type args struct { ctx context.Context } type fields struct { - client discoverer.Client - targetAddrs []string - targetAddrList map[string]bool - concurrency int + client discoverer.Client + targetAddrs []string + concurrency int } type want struct { err error @@ -66,7 +67,6 @@ func Test_index_Start(t *testing.T) { args: args{ ctx: context.Background(), }, - fields: fields{ client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { @@ -86,37 +86,35 @@ func Test_index_Start(t *testing.T) { } }(), func() test { - addrs := []string{ - "127.0.0.1:8080", - } return test{ - name: "Fail: when there is an error wrapped with gRPC status in the save indexing request process", + name: "Success: when a target addresses (targetAddrs) is given and there are no errors in the save indexing request process", args: args{ ctx: context.Background(), }, fields: fields{ + targetAddrs: []string{ + "127.0.0.1:8080", + "127.0.0.1:8081", + "127.0.0.1:8083", + }, client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { - return addrs + return nil }, GetClientFunc: func() grpc.Client { return &grpcmock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { - return status.WrapWithInternal( - agent.SaveIndexRPCName+" API connection not found", - errors.ErrGRPCClientConnNotFound("*"), - ) + return nil + }, + ConnectFunc: func(_ context.Context, _ string, _ ...grpc.DialOption) (pool.Conn, error) { + return nil, nil }, } }, }, }, - want: want{ - err: status.Error(codes.Internal, - agent.SaveIndexRPCName+" API connection not found"), - }, } }(), func() test { @@ -124,11 +122,10 @@ func Test_index_Start(t *testing.T) { "127.0.0.1:8080", } return test{ - name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error", + name: "Fail: when there is an error wrapped with gRPC status in the save indexing request process", args: args{ ctx: context.Background(), }, - fields: fields{ client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { @@ -139,7 +136,10 @@ func Test_index_Start(t *testing.T) { OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { - return errors.ErrGRPCClientConnNotFound("*") + return status.WrapWithInternal( + agent.SaveIndexRPCName+" API connection not found", + errors.ErrGRPCClientConnNotFound("*"), + ) }, } }, @@ -152,30 +152,33 @@ func Test_index_Start(t *testing.T) { } }(), func() test { - targetAddrs := []string{ + addrs := []string{ "127.0.0.1:8080", } - targetAddrList := map[string]bool{ - targetAddrs[0]: true, - } return test{ - name: "Fail: when there is no address matching targetAddrList", + name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error", args: args{ ctx: context.Background(), }, fields: fields{ client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { - // NOTE: This function returns nil, meaning that the targetAddrs stored in the field are invalid values. - return nil + return addrs + }, + GetClientFunc: func() grpc.Client { + return &grpcmock.GRPCClientMock{ + OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, + _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, + ) error { + return errors.ErrGRPCClientConnNotFound("*") + }, + } }, }, - targetAddrs: targetAddrs, - targetAddrList: targetAddrList, }, want: want{ err: status.Error(codes.Internal, - agent.SaveIndexRPCName+" API connection target address \"127.0.0.1:8080\" not found"), + agent.SaveIndexRPCName+" API connection not found"), }, } }(), @@ -197,10 +200,9 @@ func Test_index_Start(t *testing.T) { checkFunc = defaultCheckFunc } idx := &index{ - client: test.fields.client, - targetAddrs: test.fields.targetAddrs, - targetAddrList: test.fields.targetAddrList, - concurrency: test.fields.concurrency, + client: test.fields.client, + targetAddrs: test.fields.targetAddrs, + concurrency: test.fields.concurrency, } err := idx.Start(test.args.ctx) @@ -310,7 +312,6 @@ func Test_index_Start(t *testing.T) { // type fields struct { // client discoverer.Client // targetAddrs []string -// targetAddrList map[string]bool // concurrency int // } // type want struct { @@ -346,7 +347,6 @@ func Test_index_Start(t *testing.T) { // fields: fields { // client:nil, // targetAddrs:nil, -// targetAddrList:nil, // concurrency:0, // }, // want: want{}, @@ -371,7 +371,6 @@ func Test_index_Start(t *testing.T) { // fields: fields { // client:nil, // targetAddrs:nil, -// targetAddrList:nil, // concurrency:0, // }, // want: want{}, @@ -405,7 +404,6 @@ func Test_index_Start(t *testing.T) { // idx := &index{ // client: test.fields.client, // targetAddrs: test.fields.targetAddrs, -// targetAddrList: test.fields.targetAddrList, // concurrency: test.fields.concurrency, // } //