diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 1efcdf1915..4235a304d9 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -181,7 +181,13 @@ func (*client) LabelSelector(key string, op selection.Operator, vals []string) ( return labels.NewSelector().Add(*requirements), nil } -type Patcher struct { +// Patcher is an interface for patching resources with controller-runtime client. +type Patcher interface { + // ApplyPodAnnotations applies the given annotations to the agent pod with server-side apply. + ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error +} + +type patcher struct { client Client fieldManager string } @@ -189,16 +195,16 @@ type Patcher struct { func NewPatcher(fieldManager string) (Patcher, error) { client, err := New() if err != nil { - return Patcher{}, err + return nil, err } - return Patcher{ + return &patcher{ client: client, fieldManager: fieldManager, }, nil } -func (s *Patcher) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error { +func (s *patcher) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error { var podList corev1.PodList if err := s.client.List(ctx, &podList, &cli.ListOptions{ Namespace: namespace, diff --git a/internal/test/mock/k8s/client.go b/internal/test/mock/k8s/client.go index 663b4b7f73..1d51944043 100644 --- a/internal/test/mock/k8s/client.go +++ b/internal/test/mock/k8s/client.go @@ -69,3 +69,14 @@ func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, val args := m.Called(key, op, vals) return args.Get(0).(labels.Selector), args.Error(1) } + +type PatcherMock struct { + mock.Mock +} + +var _ client.Patcher = (*PatcherMock)(nil) + +func (m *PatcherMock) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error { + args := m.Called(ctx, name, namespace, entries) + return args.Error(0) +} diff --git a/pkg/agent/core/ngt/service/export_test.go b/pkg/agent/core/ngt/service/export_test.go new file mode 100644 index 0000000000..bc9db25433 --- /dev/null +++ b/pkg/agent/core/ngt/service/export_test.go @@ -0,0 +1,19 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package service + +var ExportMetricsOnTick = (*ngt).exportMetricsOnTick diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index ddb05b288b..0b272a2bf3 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -51,6 +51,8 @@ import ( "github.com/vdaas/vald/pkg/agent/internal/vqueue" ) +type contextSaveIndexTimeKey string + type NGT interface { Start(ctx context.Context) <-chan error Search(ctx context.Context, vec []float32, size uint32, epsilon, radius float32) (*payload.Search_Response, error) @@ -161,12 +163,14 @@ const ( originIndexDirName = "origin" brokenIndexDirName = "broken" - fieldManager = "vald-agent-index-controller" uncommittedAnnotationsKey = "vald.vdaas.org/uncommitted" unsavedProcessedVqAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq" unsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution" lastTimeSaveIndexTimestampAnnotationsKey = "vald.vdaas.org/last-time-save-index-timestamp" indexCountAnnotationsKey = "vald.vdaas.org/index-count" + + // use this only for tests. usually just leave the ctx value empty and let time.Now() be used + saveIndexTimeKey contextSaveIndexTimeKey = "saveIndexTimeKey" ) func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { @@ -233,14 +237,6 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { n.indexing.Store(false) n.saving.Store(false) - if n.enableExportIndexInfo { - patcher, err := client.NewPatcher(fieldManager) - if err != nil { - return nil, fmt.Errorf("failed to create pacher: %w", err) - } - n.patcher = patcher - } - return n, nil } @@ -1864,9 +1860,8 @@ func (n *ngt) unsavedNumberOfCreateIndexExecutionEntry() (k, v string) { return unsavedCreateIndexExecutionNumAnnotationsKey, strconv.FormatUint(num, 10) } -func (n *ngt) lastTimeSaveIndexTimestampEntry() (k, v string) { - timestamp := time.Now().UTC().Format(time.RFC3339) - return lastTimeSaveIndexTimestampAnnotationsKey, timestamp +func (n *ngt) lastTimeSaveIndexTimestampEntry(timestamp time.Time) (k, v string) { + return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(time.RFC3339) } func (n *ngt) indexCountEntry() (k, v string) { @@ -1904,7 +1899,13 @@ func (n *ngt) exportMetricsOnCreateIndex(ctx context.Context) error { func (n *ngt) exportMetricsOnSaveIndex(ctx context.Context) error { entries := make(map[string]string) - k, v := n.lastTimeSaveIndexTimestampEntry() + val := ctx.Value(saveIndexTimeKey) + t, ok := val.(time.Time) + if !ok { + t = time.Now() + } + + k, v := n.lastTimeSaveIndexTimestampEntry(t) entries[k] = v k, v = n.unsavedNumberOfCreateIndexExecutionEntry() diff --git a/pkg/agent/core/ngt/service/ngt_test.go b/pkg/agent/core/ngt/service/ngt_test.go index c1caceff53..51848539f6 100644 --- a/pkg/agent/core/ngt/service/ngt_test.go +++ b/pkg/agent/core/ngt/service/ngt_test.go @@ -44,6 +44,8 @@ import ( testdata "github.com/vdaas/vald/internal/test" "github.com/vdaas/vald/internal/test/data/vector" "github.com/vdaas/vald/internal/test/goleak" + "github.com/vdaas/vald/internal/test/mock/k8s" + "github.com/vdaas/vald/internal/test/testify" "github.com/vdaas/vald/pkg/agent/internal/kvs" "github.com/vdaas/vald/pkg/agent/internal/metadata" "github.com/vdaas/vald/pkg/agent/internal/vqueue" @@ -1126,6 +1128,241 @@ func Test_ngt_Close(t *testing.T) { } } +func TestExportIndexInfo(t *testing.T) { + t.Parallel() + config := defaultConfig + config.Dimension = 3 + config.EnableExportIndexInfoToK8s = true + config.PodName = "test-pod" + + type test struct { + name string + testfunc func(t *testing.T) + } + + tests := []test{ + { + "export after create index one vector", + func(t *testing.T) { + mock := &k8s.PatcherMock{} + mock.On("ApplyPodAnnotations", + testify.Anything, + testify.Anything, + testify.Anything, + testify.Anything, + ).Return(nil) + + ngt, err := New(&config, WithPatcher(mock)) + require.NoError(t, err) + + now := time.Now().UnixNano() + err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, now) + require.NoError(t, err) + + err = ngt.CreateIndex(context.Background(), 10) + require.NoError(t, err) + + // expected entries + expected := map[string]string{ + indexCountAnnotationsKey: "1", + uncommittedAnnotationsKey: "0", + unsavedCreateIndexExecutionNumAnnotationsKey: "1", + unsavedProcessedVqAnnotationsKey: "1", + } + // check mock called result + mock.AssertExpectations(t) + mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 1) + mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expected) + }, + }, + { + "export after create index multiple vectors", + func(t *testing.T) { + mock := &k8s.PatcherMock{} + mock.On("ApplyPodAnnotations", + testify.Anything, + testify.Anything, + testify.Anything, + testify.Anything, + ).Return(nil) + + ngt, err := New(&config, WithPatcher(mock)) + require.NoError(t, err) + + time1 := time.Now().UnixNano() + err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1) + require.NoError(t, err) + + time2 := time.Now().UnixNano() + err = ngt.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2) + require.NoError(t, err) + + err = ngt.CreateIndex(context.Background(), 10) + require.NoError(t, err) + + // expected entries + expected := map[string]string{ + indexCountAnnotationsKey: "2", + uncommittedAnnotationsKey: "0", + unsavedCreateIndexExecutionNumAnnotationsKey: "1", + unsavedProcessedVqAnnotationsKey: "2", + } + // check mock called result + mock.AssertExpectations(t) + mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 1) + mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expected) + }, + }, + { + "export after create index multiple times", + func(t *testing.T) { + mock := &k8s.PatcherMock{} + mock.On("ApplyPodAnnotations", + testify.Anything, + testify.Anything, + testify.Anything, + testify.Anything, + ).Return(nil) + + ngt, err := New(&config, WithPatcher(mock)) + require.NoError(t, err) + + time1 := time.Now().UnixNano() + err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1) + require.NoError(t, err) + + err = ngt.CreateIndex(context.Background(), 10) + require.NoError(t, err) + + time2 := time.Now().UnixNano() + err = ngt.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2) + require.NoError(t, err) + + err = ngt.CreateIndex(context.Background(), 10) + require.NoError(t, err) + + // expected entries + expected := map[string]string{ + indexCountAnnotationsKey: "2", + uncommittedAnnotationsKey: "0", + unsavedCreateIndexExecutionNumAnnotationsKey: "2", + unsavedProcessedVqAnnotationsKey: "2", + } + // check mock called result + mock.AssertExpectations(t) + mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 2) + mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expected) + }, + }, + { + "export after create index multiple vectors and save index", + func(t *testing.T) { + mock := &k8s.PatcherMock{} + mock.On("ApplyPodAnnotations", + testify.Anything, + testify.Anything, + testify.Anything, + testify.Anything, + ).Return(nil) + + tmpdir := t.TempDir() + + ngt, err := New(&config, + WithIndexPath(tmpdir), + WithPatcher(mock), + ) + require.NoError(t, err) + + time1 := time.Now().UnixNano() + err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1) + require.NoError(t, err) + + time2 := time.Now().UnixNano() + err = ngt.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2) + require.NoError(t, err) + + ctx := context.Background() + err = ngt.CreateIndex(ctx, 10) + require.NoError(t, err) + + // set time in context for testing + saveIndexTime := time.Now() + ctx = context.WithValue(ctx, saveIndexTimeKey, saveIndexTime) + + err = ngt.SaveIndex(ctx) + require.NoError(t, err) + + // expected entries + expectedAfterCreate := map[string]string{ + indexCountAnnotationsKey: "2", + uncommittedAnnotationsKey: "0", + unsavedCreateIndexExecutionNumAnnotationsKey: "1", + unsavedProcessedVqAnnotationsKey: "2", + } + expectedAfterSave := map[string]string{ + lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(time.RFC3339), + unsavedCreateIndexExecutionNumAnnotationsKey: "0", + unsavedProcessedVqAnnotationsKey: "0", + } + // check mock called result + mock.AssertExpectations(t) + mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 2) + mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expectedAfterCreate) + mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expectedAfterSave) + }, + }, + { + "export after inserting vectors", + func(t *testing.T) { + mock := &k8s.PatcherMock{} + mock.On("ApplyPodAnnotations", + testify.Anything, + testify.Anything, + testify.Anything, + testify.Anything, + ).Return(nil) + + tmpdir := t.TempDir() + + n, err := New(&config, + WithIndexPath(tmpdir), + WithPatcher(mock), + ) + require.NoError(t, err) + + time1 := time.Now().UnixNano() + err = n.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1) + require.NoError(t, err) + + time2 := time.Now().UnixNano() + err = n.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2) + require.NoError(t, err) + + ctx := context.Background() + ExportMetricsOnTick(n.(*ngt), ctx) + + // expected entries + expectedAfterInsert := map[string]string{ + indexCountAnnotationsKey: "0", + uncommittedAnnotationsKey: "2", + } + // check mock called result + mock.AssertExpectations(t) + mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 1) + mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expectedAfterInsert) + }, + }, + } + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + test.testfunc(tt) + }) + } +} + func Test_ngt_InsertUpsert(t *testing.T) { if testing.Short() { t.Skip("The execution of this test takes a lot of time, so it is not performed during the short test\ttest: Test_ngt_InsertUpsert") diff --git a/pkg/agent/core/ngt/service/option.go b/pkg/agent/core/ngt/service/option.go index 096115d0c1..9d2a07f654 100644 --- a/pkg/agent/core/ngt/service/option.go +++ b/pkg/agent/core/ngt/service/option.go @@ -23,7 +23,9 @@ import ( "time" core "github.com/vdaas/vald/internal/core/algorithm/ngt" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/file" + "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/rand" "github.com/vdaas/vald/internal/strings" "github.com/vdaas/vald/internal/sync/errgroup" @@ -326,3 +328,14 @@ func WithExportIndexInfoDuration(dur string) Option { return nil } } + +// WithPatcher returns the functional option to set the patcher for patching k8s resources. +func WithPatcher(p client.Patcher) Option { + return func(n *ngt) error { + if p == nil { + return errors.NewErrInvalidOption("patcher", p) + } + n.patcher = p + return nil + } +} diff --git a/pkg/agent/core/ngt/usecase/agentd.go b/pkg/agent/core/ngt/usecase/agentd.go index eab6039b2e..d596c72e4d 100644 --- a/pkg/agent/core/ngt/usecase/agentd.go +++ b/pkg/agent/core/ngt/usecase/agentd.go @@ -22,6 +22,7 @@ import ( agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" vald "github.com/vdaas/vald/apis/grpc/v1/vald" iconf "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/net/grpc" "github.com/vdaas/vald/internal/observability" ngtmetrics "github.com/vdaas/vald/internal/observability/metrics/agent/core/ngt" @@ -47,9 +48,12 @@ type run struct { observability observability.Observability } +const ( + fieldManager = "vald-agent-index-controller" +) + func New(cfg *config.Data) (r runner.Runner, err error) { - ngt, err := service.New( - cfg.NGT, + serviceOpts := []service.Option{ service.WithErrGroup(errgroup.Get()), service.WithEnableInMemoryMode(cfg.NGT.EnableInMemoryMode), service.WithIndexPath(cfg.NGT.IndexPath), @@ -67,11 +71,25 @@ func New(cfg *config.Data) (r runner.Runner, err error) { service.WithProactiveGC(cfg.NGT.EnableProactiveGC), service.WithCopyOnWrite(cfg.NGT.EnableCopyOnWrite), service.WithIsReadReplica(cfg.NGT.IsReadReplica), - service.WithExportIndexInfoDuration(cfg.NGT.ExportIndexInfoDuration), + } + if cfg.NGT.EnableExportIndexInfoToK8s { + patcher, err := client.NewPatcher(fieldManager) + if err != nil { + return nil, err + } + serviceOpts = append(serviceOpts, + service.WithPatcher(patcher), + service.WithExportIndexInfoDuration(cfg.NGT.ExportIndexInfoDuration), + ) + } + ngt, err := service.New( + cfg.NGT, + serviceOpts..., ) if err != nil { return nil, err } + g, err := handler.New( handler.WithNGT(ngt), handler.WithStreamConcurrency(cfg.Server.GetGRPCStreamConcurrency()),