From 699ba22c890f09212554076660a3db310016fee6 Mon Sep 17 00:00:00 2001 From: Yusuke Kadowaki Date: Fri, 16 Feb 2024 16:57:35 +0900 Subject: [PATCH] Make agent export index metrics to Pod k8s resource (#2319) * Add config * Add clusterRole and serviceAccount for agent * Add more metrics * Format * Add export index info duration configuration * Refactor internal/k8s * Move apply function to internal/k8s * Add more config * Remove unused import and variable * Use vald errors * Add pod name and pod namespace to hack e2e agent * Apply tagalign * Revert unnecesarry specification of PodName * Enable export index info to Kubernetes in agent templates only when the config is enabled * Fix agent clusterrole name * Update key name * Update comments * Add index count as export metrics * Ignore gomnd * Add index count entry to exportMetricsOnCreateIndex function * Update values schema * Fix formatter job * Remove unused defines --- .github/workflows/format.yml | 2 +- .../vald-helm-operator/crds/valdrelease.yaml | 29 +++ charts/vald/templates/agent/clusterrole.yaml | 37 ++++ .../templates/agent/clusterrolebinding.yaml | 37 ++++ charts/vald/templates/agent/daemonset.yaml | 3 + charts/vald/templates/agent/deployment.yaml | 3 + .../vald/templates/agent/serviceaccount.yaml | 29 +++ charts/vald/templates/agent/statefulset.yaml | 3 + charts/vald/values.schema.json | 52 +++++ charts/vald/values.yaml | 36 ++++ cmd/agent/core/ngt/sample.yaml | 2 + internal/config/ngt.go | 14 ++ internal/k8s/client/client.go | 110 +++++++++-- pkg/agent/core/ngt/service/ngt.go | 177 ++++++++++++++++-- pkg/agent/core/ngt/service/option.go | 19 ++ pkg/agent/core/ngt/service/option_test.go | 85 +++++++++ pkg/agent/core/ngt/usecase/agentd.go | 1 + .../job/readreplica/rotate/service/rotator.go | 2 +- 18 files changed, 607 insertions(+), 34 deletions(-) create mode 100644 charts/vald/templates/agent/clusterrole.yaml create mode 100644 charts/vald/templates/agent/clusterrolebinding.yaml create mode 100644 charts/vald/templates/agent/serviceaccount.yaml diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 7cc66cea3d..c690ed6dc6 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -105,7 +105,7 @@ jobs: run: | make deps/install make format - git checkout go.mod go.sum ./rust/Cargo.lock + git checkout go.mod go.sum ./example/client/go.mod ./example/client/go.sum ./rust/Cargo.lock - name: Check format and deps difference run: | if git diff --quiet --exit-code; then diff --git a/charts/vald-helm-operator/crds/valdrelease.yaml b/charts/vald-helm-operator/crds/valdrelease.yaml index fa8d749b29..6e7776f91d 100644 --- a/charts/vald-helm-operator/crds/valdrelease.yaml +++ b/charts/vald-helm-operator/crds/valdrelease.yaml @@ -124,6 +124,20 @@ spec: annotations: type: object x-kubernetes-preserve-unknown-fields: true + clusterRole: + type: object + properties: + enabled: + type: boolean + name: + type: string + clusterRoleBinding: + type: object + properties: + enabled: + type: boolean + name: + type: string enabled: type: boolean env: @@ -251,6 +265,8 @@ spec: - normalizedcosine enable_copy_on_write: type: boolean + enable_export_index_info_to_k8s: + type: boolean enable_in_memory_mode: type: boolean enable_proactive_gc: @@ -258,6 +274,8 @@ spec: error_buffer_limit: type: integer minimum: 1 + export_index_info_duration: + type: string index_path: type: string initial_delay_max_duration: @@ -273,12 +291,16 @@ spec: type: string min_load_index_timeout: type: string + namespace: + type: string object_type: type: string enum: - float - float16 - uint8 + pod_name: + type: string search_edge_size: type: integer vqueue: @@ -959,6 +981,13 @@ spec: labels: type: object x-kubernetes-preserve-unknown-fields: true + serviceAccount: + type: object + properties: + enabled: + type: boolean + name: + type: string serviceType: type: string enum: diff --git a/charts/vald/templates/agent/clusterrole.yaml b/charts/vald/templates/agent/clusterrole.yaml new file mode 100644 index 0000000000..79861a9b80 --- /dev/null +++ b/charts/vald/templates/agent/clusterrole.yaml @@ -0,0 +1,37 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{{- $agent := .Values.agent -}} +{{- if and $agent.enabled $agent.clusterRole.enabled $agent.ngt.enable_export_index_info_to_k8s }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ $agent.clusterRole.name }} + labels: + app.kubernetes.io/name: {{ include "vald.name" . }} + helm.sh/chart: {{ include "vald.chart" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/version: {{ .Chart.Version }} + app.kubernetes.io/component: agent +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - list + - patch +{{- end }} diff --git a/charts/vald/templates/agent/clusterrolebinding.yaml b/charts/vald/templates/agent/clusterrolebinding.yaml new file mode 100644 index 0000000000..98b84ec621 --- /dev/null +++ b/charts/vald/templates/agent/clusterrolebinding.yaml @@ -0,0 +1,37 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{{- $agent := .Values.agent -}} +{{- if and $agent.enabled $agent.clusterRoleBinding.enabled $agent.ngt.enable_export_index_info_to_k8s }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ $agent.clusterRoleBinding.name }} + labels: + app.kubernetes.io/name: {{ include "vald.name" . }} + helm.sh/chart: {{ include "vald.chart" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/version: {{ .Chart.Version }} + app.kubernetes.io/component: agent +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ $agent.clusterRole.name }} +subjects: + - kind: ServiceAccount + name: {{ $agent.serviceAccount.name }} + namespace: {{ .Release.Namespace }} +{{- end }} diff --git a/charts/vald/templates/agent/daemonset.yaml b/charts/vald/templates/agent/daemonset.yaml index 7642ed1f97..7d5b53d79c 100644 --- a/charts/vald/templates/agent/daemonset.yaml +++ b/charts/vald/templates/agent/daemonset.yaml @@ -167,6 +167,9 @@ spec: {{- toYaml $agent.podSecurityContext | nindent 8 }} {{- end }} terminationGracePeriodSeconds: {{ $agent.terminationGracePeriodSeconds }} + {{- if and $agent.serviceAccount.enabled $agent.ngt.enable_export_index_info_to_k8s }} + serviceAccountName: {{ $agent.serviceAccount.name }} + {{- end }} volumes: - name: {{ $agent.name }}-config configMap: diff --git a/charts/vald/templates/agent/deployment.yaml b/charts/vald/templates/agent/deployment.yaml index 6472b0ad54..d6577a0b3d 100644 --- a/charts/vald/templates/agent/deployment.yaml +++ b/charts/vald/templates/agent/deployment.yaml @@ -171,6 +171,9 @@ spec: {{- toYaml $agent.podSecurityContext | nindent 8 }} {{- end }} terminationGracePeriodSeconds: {{ $agent.terminationGracePeriodSeconds }} + {{- if and $agent.serviceAccount.enabled $agent.ngt.enable_export_index_info_to_k8s }} + serviceAccountName: {{ $agent.serviceAccount.name }} + {{- end }} volumes: - name: {{ $agent.name }}-config configMap: diff --git a/charts/vald/templates/agent/serviceaccount.yaml b/charts/vald/templates/agent/serviceaccount.yaml new file mode 100644 index 0000000000..da550b5c4e --- /dev/null +++ b/charts/vald/templates/agent/serviceaccount.yaml @@ -0,0 +1,29 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{{- $agent := .Values.agent -}} +{{- if and $agent.enabled $agent.serviceAccount.enabled $agent.ngt.enable_export_index_info_to_k8s }} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ $agent.serviceAccount.name }} + labels: + app.kubernetes.io/name: {{ include "vald.name" . }} + helm.sh/chart: {{ include "vald.chart" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/version: {{ .Chart.Version }} + app.kubernetes.io/component: agent +{{- end }} diff --git a/charts/vald/templates/agent/statefulset.yaml b/charts/vald/templates/agent/statefulset.yaml index 855bec3c17..ecad246c50 100644 --- a/charts/vald/templates/agent/statefulset.yaml +++ b/charts/vald/templates/agent/statefulset.yaml @@ -209,6 +209,9 @@ spec: {{- toYaml $agent.podSecurityContext | nindent 8 }} {{- end }} terminationGracePeriodSeconds: {{ $agent.terminationGracePeriodSeconds }} + {{- if and $agent.serviceAccount.enabled $agent.ngt.enable_export_index_info_to_k8s }} + serviceAccountName: {{ $agent.serviceAccount.name }} + {{- end }} volumes: - name: {{ $agent.name }}-config configMap: diff --git a/charts/vald/values.schema.json b/charts/vald/values.schema.json index 3812adcd1d..285a78aa2a 100644 --- a/charts/vald/values.schema.json +++ b/charts/vald/values.schema.json @@ -70,6 +70,29 @@ "type": "object", "description": "deployment annotations" }, + "clusterRole": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "creates clusterRole resource" + }, + "name": { "type": "string", "description": "name of clusterRole" } + } + }, + "clusterRoleBinding": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "creates clusterRoleBinding resource" + }, + "name": { + "type": "string", + "description": "name of clusterRoleBinding" + } + } + }, "enabled": { "type": "boolean", "description": "agent enabled" }, "env": { "type": "array", @@ -238,6 +261,10 @@ "type": "boolean", "description": "enable copy on write saving for more stable backup" }, + "enable_export_index_info_to_k8s": { + "type": "boolean", + "description": "enable export index info to k8s" + }, "enable_in_memory_mode": { "type": "boolean", "description": "in-memory mode enabled" @@ -251,6 +278,10 @@ "description": "maximum number of core ngt error buffer pool size limit", "minimum": 1 }, + "export_index_info_duration": { + "type": "string", + "description": "duration of exporting index info" + }, "index_path": { "type": "string", "description": "path to index data" @@ -280,11 +311,19 @@ "type": "string", "description": "minimum duration of load index timeout" }, + "namespace": { + "type": "string", + "description": "namespace of myself" + }, "object_type": { "type": "string", "description": "object type. it should be `float` or `uint8` or `float16`. for further details: https://github.com/yahoojapan/NGT/wiki/Command-Quick-Reference", "enum": ["float", "float16", "uint8"] }, + "pod_name": { + "type": "string", + "description": "pod name of myself" + }, "search_edge_size": { "type": "integer", "description": "search edge size" @@ -1459,6 +1498,19 @@ "labels": { "type": "object", "description": "service labels" } } }, + "serviceAccount": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "description": "creates service account" + }, + "name": { + "type": "string", + "description": "name of service account" + } + } + }, "serviceType": { "type": "string", "description": "service type: ClusterIP, LoadBalancer or NodePort", diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index b9a5954bd1..710817c6e6 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -1904,6 +1904,30 @@ agent: # @schema {"name": "agent.initContainers", "alias": "initContainers"} # agent.initContainers -- init containers initContainers: [] + # @schema {"name": "agent.clusterRole", "type": "object"} + clusterRole: + # @schema {"name": "agent.clusterRole.enabled", "type": "boolean"} + # agent.clusterRole.enabled -- creates clusterRole resource + enabled: true + # @schema {"name": "agent.clusterRole.name", "type": "string"} + # agent.clusterRole.name -- name of clusterRole + name: agent + # @schema {"name": "agent.clusterRoleBinding", "type": "object"} + clusterRoleBinding: + # @schema {"name": "agent.clusterRoleBinding.enabled", "type": "boolean"} + # agent.clusterRoleBinding.enabled -- creates clusterRoleBinding resource + enabled: true + # @schema {"name": "agent.clusterRoleBinding.name", "type": "string"} + # agent.clusterRoleBinding.name -- name of clusterRoleBinding + name: agent + # @schema {"name": "agent.serviceAccount", "type": "object"} + serviceAccount: + # @schema {"name": "agent.serviceAccount.enabled", "type": "boolean"} + # agent.serviceAccount.enabled -- creates service account + enabled: true + # @schema {"name": "agent.serviceAccount.name", "type": "string"} + # agent.serviceAccount.name -- name of service account + name: agent-ngt # @schema {"name": "agent.env", "alias": "env"} # agent.env -- environment variables env: @@ -2035,6 +2059,12 @@ agent: annotations: {} # @schema {"name": "agent.ngt", "type": "object"} ngt: + # @schema {"name": "agent.ngt.pod_name", "type": "string"} + # agent.ngt.pod_name -- pod name of myself + pod_name: _MY_POD_NAME_ + # @schema {"name": "agent.ngt.namespace", "type": "string"} + # agent.ngt.namespace -- namespace of myself + namespace: _MY_POD_NAMESPACE_ # @schema {"name": "agent.ngt.index_path", "type": "string"} # agent.ngt.index_path -- path to index data index_path: "" @@ -2107,6 +2137,12 @@ agent: # @schema {"name": "agent.ngt.enable_copy_on_write", "type": "boolean"} # agent.ngt.enable_copy_on_write -- enable copy on write saving for more stable backup enable_copy_on_write: false + # @schema {"name": "agent.ngt.enable_export_index_info_to_k8s", "type": "boolean"} + # agent.ngt.enable_export_index_info_to_k8s -- enable export index info to k8s + enable_export_index_info_to_k8s: false + # @schema {"name": "agent.ngt.export_index_info_duration", "type": "string"} + # agent.ngt.export_index_info_duration -- duration of exporting index info + export_index_info_duration: 1m # @schema {"name": "agent.ngt.vqueue", "type": "object"} vqueue: # @schema {"name": "agent.ngt.vqueue.insert_buffer_pool_size", "type": "integer"} diff --git a/cmd/agent/core/ngt/sample.yaml b/cmd/agent/core/ngt/sample.yaml index 2741e2ea70..6d2f5390ba 100644 --- a/cmd/agent/core/ngt/sample.yaml +++ b/cmd/agent/core/ngt/sample.yaml @@ -102,6 +102,8 @@ observability: service_name: "vald-agent-ngt" buffer_max_count: 10 ngt: + pod_name: "vald-agent-ngt-0" # this might overwrite k8s resource of agent pod 0 + namespace: "default" auto_create_index_pool_size: 10000 auto_index_check_duration: 30m auto_index_duration_limit: 24h diff --git a/internal/config/ngt.go b/internal/config/ngt.go index 01b8b31165..236a1c1fa8 100644 --- a/internal/config/ngt.go +++ b/internal/config/ngt.go @@ -19,6 +19,12 @@ package config // NGT represent the ngt core configuration for server. type NGT struct { + // PodName represent the ngt pod name + PodName string `json:"pod_name,omitempty" yaml:"pod_name"` + + // PodNamespace represent the ngt pod namespace + PodNamespace string `json:"namespace,omitempty" yaml:"namespace"` + // IndexPath represent the ngt index file path IndexPath string `json:"index_path,omitempty" yaml:"index_path"` @@ -97,6 +103,12 @@ type NGT struct { // IsReadReplica represents whether the ngt is read replica or not IsReadReplica bool `json:"is_readreplica" yaml:"is_readreplica"` + + // EnableExportIndexInfoToK8s represents whether the ngt index info is exported to k8s or not + EnableExportIndexInfoToK8s bool `json:"enable_export_index_info_to_k8s" yaml:"enable_export_index_info_to_k8s"` + + // ExportIndexInfoDuration represents the duration of exporting index info to k8s + ExportIndexInfoDuration string `json:"export_index_info_duration,omitempty" yaml:"export_index_info_duration"` } // KVSDB represent the ngt vector bidirectional kv store configuration. @@ -116,6 +128,8 @@ type VQueue struct { // Bind returns NGT object whose some string value is filed value or environment value. func (n *NGT) Bind() *NGT { + n.PodName = GetActualValue(n.PodName) + n.PodNamespace = GetActualValue(n.PodNamespace) n.IndexPath = GetActualValue(n.IndexPath) n.DistanceType = GetActualValue(n.DistanceType) n.ObjectType = GetActualValue(n.ObjectType) diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 65b05c5696..bac11df061 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -22,15 +22,21 @@ import ( "fmt" snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" + "github.com/vdaas/vald/internal/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" + applycorev1 "k8s.io/client-go/applyconfigurations/core/v1" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" cli "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" ) type ( @@ -54,6 +60,11 @@ const ( SelectionOpEquals = selection.Equals ) +var ( + ServerSideApply = cli.Apply + MergePatch = cli.Merge +) + type Client interface { // Get retrieves an obj for the given object key from the Kubernetes Cluster. // obj must be a struct pointer so that obj can be updated with the response @@ -75,6 +86,10 @@ type Client interface { // struct pointer so that obj can be updated with the content returned by the Server. Update(ctx context.Context, obj Object, opts ...cli.UpdateOption) error + // Patch patches the given obj in the Kubernetes cluster. obj must be a + // struct pointer so that obj can be updated with the content returned by the Server. + Patch(ctx context.Context, obj Object, patch cli.Patch, opts ...cli.PatchOption) error + // Watch watches the given obj for changes and takes the appropriate callbacks. Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error) @@ -84,11 +99,10 @@ type Client interface { type client struct { scheme *runtime.Scheme - reader cli.Reader withWatch cli.WithWatch } -func New(opts ...Option) (Client, error) { +func New(opts ...Option) (_ Client, err error) { c := new(client) if c.scheme == nil { c.scheme = runtime.NewScheme() @@ -106,13 +120,7 @@ func New(opts ...Option) (Client, error) { if err := snapshotv1.AddToScheme(c.scheme); err != nil { return nil, err } - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), manager.Options{ - Scheme: c.scheme, - }) - if err != nil { - return nil, err - } - c.reader = mgr.GetAPIReader() + c.withWatch, err = cli.NewWithWatch(ctrl.GetConfigOrDie(), cli.Options{ Scheme: c.scheme, }) @@ -124,7 +132,7 @@ func New(opts ...Option) (Client, error) { } func (c *client) Get(ctx context.Context, name, namespace string, obj cli.Object, opts ...cli.GetOption) error { - return c.reader.Get( + return c.withWatch.Get( ctx, cli.ObjectKey{ Name: name, @@ -136,7 +144,7 @@ func (c *client) Get(ctx context.Context, name, namespace string, obj cli.Object } func (c *client) List(ctx context.Context, list cli.ObjectList, opts ...cli.ListOption) error { - return c.reader.List(ctx, list, opts...) + return c.withWatch.List(ctx, list, opts...) } func (c *client) Create(ctx context.Context, obj Object, opts ...CreateOption) error { @@ -151,6 +159,10 @@ func (c *client) Update(ctx context.Context, obj Object, opts ...cli.UpdateOptio return c.withWatch.Update(ctx, obj, opts...) } +func (c *client) Patch(ctx context.Context, obj Object, patch cli.Patch, opts ...cli.PatchOption) error { + return c.withWatch.Patch(ctx, obj, patch, opts...) +} + func (c *client) Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error) { return c.withWatch.Watch(ctx, obj, opts...) } @@ -162,3 +174,77 @@ func (*client) LabelSelector(key string, op selection.Operator, vals []string) ( } return labels.NewSelector().Add(*requirements), nil } + +type Patcher struct { + client Client + fieldManager string +} + +func NewPatcher(fieldManager string) (Patcher, error) { + client, err := New() + if err != nil { + return Patcher{}, err + } + + return Patcher{ + client: client, + fieldManager: fieldManager, + }, nil +} + +func (s *Patcher) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error { + var podList corev1.PodList + if err := s.client.List(ctx, &podList, &cli.ListOptions{ + Namespace: namespace, + FieldSelector: fields.OneTermEqualSelector("metadata.name", name), + }); err != nil { + return err + } + + if len(podList.Items) == 0 { + return errors.New("agent pod not found on exporting metrics") + } + + //nolint: gomnd + if len(podList.Items) >= 2 { + return errors.New("multiple agent pods found on exporting metrics. pods with same name exist in the same namespace?") + } + pod := podList.Items[0] + + curApplyConfig, err := applycorev1.ExtractPod(&pod, s.fieldManager) + if err != nil { + return err + } + + // check if there is any diffs in the annotations + annotations := pod.GetObjectMeta().GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + for k, v := range entries { + annotations[k] = v + } + expectPod := applycorev1.Pod(name, namespace). + WithAnnotations(annotations) + + if equality.Semantic.DeepEqual(expectPod, curApplyConfig) { + // no change found in the pod annotations + return nil + } + + // now we found the diffs, apply the changes + obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(expectPod) + if err != nil { + return err + } + + patch := &unstructured.Unstructured{Object: obj} + if err := s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{ + FieldManager: s.fieldManager, + Force: ptr.To(true), + }); err != nil { + return err + } + + return nil +} diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 7c7ecfb450..4429d4ab7b 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -28,6 +28,7 @@ import ( "reflect" "runtime" "slices" + "strconv" "sync/atomic" "time" @@ -38,6 +39,7 @@ import ( core "github.com/vdaas/vald/internal/core/algorithm/ngt" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/file" + "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" @@ -102,10 +104,11 @@ type ngt struct { lastNocie uint64 // last number of create index execution this value prevent unnecessary saveindex. // counters - nocie uint64 // number of create index execution - nogce uint64 // number of proactive GC execution - wfci uint64 // wait for create indexing - nobic uint64 // number of broken index count + nocie uint64 // number of create index execution + nogce uint64 // number of proactive GC execution + wfci uint64 // wait for create indexing + nobic uint64 // number of broken index count + nopvq atomic.Uint64 // number of processed vq number // configurations inMem bool // in-memory mode @@ -123,13 +126,15 @@ type ngt struct { enableProactiveGC bool // if this value is true, agent component will purge GC memory more proactive enableCopyOnWrite bool // if this value is true, agent component will write backup file using Copy on Write and saves old files to the old directory - path string // index path - smu sync.Mutex // save index lock - tmpPath atomic.Value // temporary index path for Copy on Write - oldPath string // old volume path - basePath string // index base directory for CoW - brokenPath string // backup broken index path - cowmu sync.Mutex // copy on write move lock + podName string + podNamespace string + path string // index path + smu sync.Mutex // save index lock + tmpPath atomic.Value // temporary index path for Copy on Write + oldPath string // old volume path + basePath string // index base directory for CoW + brokenPath string // backup broken index path + cowmu sync.Mutex // copy on write move lock poolSize uint32 // default pool size radius float32 // default radius @@ -141,7 +146,10 @@ type ngt struct { kvsdbConcurrency int // kvsdb concurrency historyLimit int // the maximum generation number of broken index backup - isReadReplica bool + isReadReplica bool + enableExportIndexInfo bool + exportIndexInfoDuration time.Duration + patcher client.Patcher } const ( @@ -152,16 +160,29 @@ const ( oldIndexDirName = "backup" originIndexDirName = "origin" brokenIndexDirName = "broken" + + fieldManager = "vald-agent-index-controller" + uncommittedAnnotationsKey = "vald.vdaas.org/uncommitted" + unsavedProcessedVqAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq" + unsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution" + lastTimeSaveIndexTimestampAnnotationsKey = "vald.vdaas.org/last-time-save-index-timestamp" + indexCountAnnotationsKey = "vald.vdaas.org/index-count" ) func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { + if cfg.PodName == "" && cfg.EnableExportIndexInfoToK8s { + return nil, errors.New("pod_name is empty. this must be set either from environment variable or from config file") + } n := &ngt{ - fmap: make(map[string]int64), - dim: cfg.Dimension, - enableProactiveGC: cfg.EnableProactiveGC, - enableCopyOnWrite: cfg.EnableCopyOnWrite, - kvsdbConcurrency: cfg.KVSDB.Concurrency, - historyLimit: cfg.BrokenIndexHistoryLimit, + podName: cfg.PodName, + podNamespace: cfg.PodNamespace, + fmap: make(map[string]int64), + dim: cfg.Dimension, + enableProactiveGC: cfg.EnableProactiveGC, + enableCopyOnWrite: cfg.EnableCopyOnWrite, + kvsdbConcurrency: cfg.KVSDB.Concurrency, + historyLimit: cfg.BrokenIndexHistoryLimit, + enableExportIndexInfo: cfg.EnableExportIndexInfoToK8s, } for _, opt := range append(defaultOptions, opts...) { @@ -211,6 +232,15 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { } n.indexing.Store(false) n.saving.Store(false) + + if n.enableExportIndexInfo { + patcher, err := client.NewPatcher(fieldManager) + if err != nil { + return nil, fmt.Errorf("failed to create pacher: %w", err) + } + n.patcher = patcher + } + return n, nil } @@ -831,6 +861,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error { n.lim = math.MaxInt64 } + // add initial delay before starting auto indexing if n.idelay > 0 { timer := time.NewTimer(n.idelay) select { @@ -845,9 +876,11 @@ func (n *ngt) Start(ctx context.Context) <-chan error { tick := time.NewTicker(n.dur) sTick := time.NewTicker(n.sdur) limit := time.NewTicker(n.lim) + exportTick := time.NewTicker(n.exportIndexInfoDuration) defer tick.Stop() defer sTick.Stop() defer limit.Stop() + defer exportTick.Stop() for { err = nil select { @@ -866,6 +899,10 @@ func (n *ngt) Start(ctx context.Context) <-chan error { err = n.CreateAndSaveIndex(ctx, n.poolSize) case <-sTick.C: err = n.SaveIndex(ctx) + case <-exportTick.C: + if n.enableExportIndexInfo { + err = n.exportMetricsOnTick(ctx) + } } if err != nil && err != errors.ErrUncommittedIndexNotFound { ech <- err @@ -1149,6 +1186,9 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } log.Infof("create index operation started, uncommitted indexes = %d", ic) log.Debug("create index delete phase started") + // vqProcessedCnt is a tempral counter to store the number of processed vqueue items. + // This will be added to nopvq after CreateIndex operation succeeds. + var vqProcessedCnt uint64 n.vq.RangePopDelete(ctx, now, func(uuid string) bool { log.Debugf("start delete operation for kvsdb id: %s", uuid) oid, ok := n.kvs.Delete(uuid) @@ -1164,6 +1204,8 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { n.fmu.Unlock() } log.Debugf("removed from ngt index and kvsdb id: %s, oid: %d", uuid, oid) + + vqProcessedCnt++ return true }) log.Debug("create index delete phase finished") @@ -1196,6 +1238,8 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { } n.fmu.Unlock() log.Debugf("finished to insert ngt index and kvsdb id: %s, oid: %d", uuid, oid) + + vqProcessedCnt++ return true }) if poolSize <= 0 { @@ -1211,11 +1255,21 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { err = n.core.CreateIndex(poolSize) if err != nil { log.Error("an error occurred on creating graph and tree phase:", err) - } else { - atomic.AddUint64(&n.nocie, 1) + return err } + + atomic.AddUint64(&n.nocie, 1) + n.nopvq.Add(vqProcessedCnt) log.Debug("create graph and tree phase finished") log.Info("create index operation finished") + + if n.enableExportIndexInfo { + log.Info("exporting index info to k8s resource") + if err := n.exportMetricsOnCreateIndex(ctx); err != nil { + log.Errorf("failed to export index info to k8s resource: %v", err) + return err + } + } return err } @@ -1305,7 +1359,13 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) { return err } n.saving.Store(true) + + // number of processed vq before save operation + // this will be subtracted from n.nopvq after save operation succeeds + beforeNopvq := n.nopvq.Load() + defer n.gc() + // since defering here, atomic operations are guaranteed in this scope defer n.saving.Store(false) log.Debug("cleanup invalid index started") @@ -1496,6 +1556,14 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) { return err } log.Info("save index operation finished") + + // now save operation succeeds, subtract it from n.nopvq + n.nopvq.Add(-beforeNopvq) + if n.enableExportIndexInfo { + if err := n.exportMetricsOnSaveIndex(ctx); err != nil { + return err + } + } return nil } @@ -1644,6 +1712,10 @@ func (n *ngt) NumberOfProactiveGCExecution() uint64 { return atomic.LoadUint64(&n.nogce) } +func (n *ngt) lastNumberOfCreateIndexExecution() uint64 { + return atomic.LoadUint64(&n.lastNocie) +} + func (n *ngt) gc() { if n.enableProactiveGC { runtime.GC() @@ -1765,3 +1837,68 @@ func (n *ngt) toSearchResponse(sr []algorithm.SearchResult) (res *payload.Search } return res, nil } + +func (n *ngt) uncommittedEntry() (k, v string) { + return uncommittedAnnotationsKey, strconv.FormatUint(n.InsertVQueueBufferLen()+n.DeleteVQueueBufferLen(), 10) +} + +func (n *ngt) processedVqEntries() (k, v string) { + return unsavedProcessedVqAnnotationsKey, strconv.FormatUint(n.nopvq.Load(), 10) +} + +func (n *ngt) unsavedNumberOfCreateIndexExecutionEntry() (k, v string) { + num := n.NumberOfCreateIndexExecution() - n.lastNumberOfCreateIndexExecution() + return unsavedCreateIndexExecutionNumAnnotationsKey, strconv.FormatUint(num, 10) +} + +func (n *ngt) lastTimeSaveIndexTimestampEntry() (k, v string) { + timestamp := time.Now().UTC().Format(time.RFC3339) + return lastTimeSaveIndexTimestampAnnotationsKey, timestamp +} + +func (n *ngt) indexCountEntry() (k, v string) { + return indexCountAnnotationsKey, strconv.FormatUint(n.Len(), 10) +} + +func (n *ngt) exportMetricsOnTick(ctx context.Context) error { + entries := make(map[string]string) + k, v := n.uncommittedEntry() + entries[k] = v + + k, v = n.indexCountEntry() + entries[k] = v + + return n.patcher.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, entries) +} + +func (n *ngt) exportMetricsOnCreateIndex(ctx context.Context) error { + entries := make(map[string]string) + k, v := n.uncommittedEntry() + entries[k] = v + + k, v = n.processedVqEntries() + entries[k] = v + + k, v = n.unsavedNumberOfCreateIndexExecutionEntry() + entries[k] = v + + k, v = n.indexCountEntry() + entries[k] = v + + return n.patcher.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, entries) +} + +func (n *ngt) exportMetricsOnSaveIndex(ctx context.Context) error { + entries := make(map[string]string) + + k, v := n.lastTimeSaveIndexTimestampEntry() + entries[k] = v + + k, v = n.unsavedNumberOfCreateIndexExecutionEntry() + entries[k] = v + + k, v = n.processedVqEntries() + entries[k] = v + + return n.patcher.ApplyPodAnnotations(ctx, n.podName, n.podNamespace, entries) +} diff --git a/pkg/agent/core/ngt/service/option.go b/pkg/agent/core/ngt/service/option.go index b567f92ccc..096115d0c1 100644 --- a/pkg/agent/core/ngt/service/option.go +++ b/pkg/agent/core/ngt/service/option.go @@ -47,6 +47,7 @@ var defaultOptions = []Option{ WithDefaultRadius(core.DefaultRadius), WithDefaultEpsilon(core.DefaultEpsilon), WithProactiveGC(true), + WithExportIndexInfoDuration("1m"), } // WithErrGroup returns the functional option to set the error group. @@ -307,3 +308,21 @@ func WithIsReadReplica(isReadReplica bool) Option { return nil } } + +// WithExportIndexInfoDuration returns the functional option to set the duration of exporting index info to k8s. +func WithExportIndexInfoDuration(dur string) Option { + return func(n *ngt) error { + if dur == "" { + return nil + } + + d, err := timeutil.Parse(dur) + if err != nil { + return err + } + + n.exportIndexInfoDuration = d + + return nil + } +} diff --git a/pkg/agent/core/ngt/service/option_test.go b/pkg/agent/core/ngt/service/option_test.go index ac861fcf51..4fe907a153 100644 --- a/pkg/agent/core/ngt/service/option_test.go +++ b/pkg/agent/core/ngt/service/option_test.go @@ -1306,4 +1306,89 @@ func TestWithCopyOnWrite(t *testing.T) { } } +func TestWithExportIndexInfoDuration(t *testing.T) { + type T = ngt + type args struct { + dur string + } + type want struct { + obj *T + err error + } + type test struct { + name string + args args + want want + checkFunc func(want, *T, error) error + beforeFunc func(args) + afterFunc func(*testing.T, args) + } + defaultCheckFunc := func(w want, obj *T, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + if !reflect.DeepEqual(obj, w.obj) { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", obj, w.obj) + } + return nil + } + tests := []test{ + { + name: "set success when duration is empty string", + args: args{ + dur: "", + }, + want: want{ + obj: &T{ + exportIndexInfoDuration: 0, + }, + }, + }, + { + name: "set success when duration is a valid duration string", + args: args{ + dur: "5s", + }, + want: want{ + obj: &T{ + exportIndexInfoDuration: 5 * time.Second, + }, + }, + }, + { + name: "return error when duration is not a valid duration string", + args: args{ + dur: "5ss", + }, + want: want{ + obj: &T{}, + err: errors.Join(errors.New("time: unknown unit \"ss\" in duration \"5ss\""), errors.ErrTimeoutParseFailed("5ss")), + }, + }, + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(tt, test.args) + } + checkFunc := defaultCheckFunc + if test.checkFunc != nil { + checkFunc = test.checkFunc + } + + got := WithExportIndexInfoDuration(test.args.dur) + obj := new(T) + if err := checkFunc(test.want, obj, got(obj)); err != nil { + tt.Errorf("error = %v", err) + } + }) + } +} + // NOT IMPLEMENTED BELOW diff --git a/pkg/agent/core/ngt/usecase/agentd.go b/pkg/agent/core/ngt/usecase/agentd.go index ff5a6ee0ab..eab6039b2e 100644 --- a/pkg/agent/core/ngt/usecase/agentd.go +++ b/pkg/agent/core/ngt/usecase/agentd.go @@ -67,6 +67,7 @@ func New(cfg *config.Data) (r runner.Runner, err error) { service.WithProactiveGC(cfg.NGT.EnableProactiveGC), service.WithCopyOnWrite(cfg.NGT.EnableCopyOnWrite), service.WithIsReadReplica(cfg.NGT.IsReadReplica), + service.WithExportIndexInfoDuration(cfg.NGT.ExportIndexInfoDuration), ) if err != nil { return nil, err diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index 5a1fca4a34..6ce4272d73 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -271,7 +271,7 @@ func (r *rotator) updateDeployment(ctx context.Context, newPVC string, deploymen if deployment.Spec.Template.ObjectMeta.Annotations == nil { deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{} } - deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339) + deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().UTC().Format(time.RFC3339) for _, vol := range deployment.Spec.Template.Spec.Volumes { if vol.Name == r.volumeName {