diff --git a/charts/vald/templates/index/operator/configmap.yaml b/charts/vald/templates/index/operator/configmap.yaml index 8f977c0acb..3379f08997 100644 --- a/charts/vald/templates/index/operator/configmap.yaml +++ b/charts/vald/templates/index/operator/configmap.yaml @@ -45,4 +45,6 @@ data: agent_name: {{ $agent.name }} agent_namespace: {{ $agent.namespace }} concurrency: 1 + read_replica_enabled: {{ $agent.readreplica.enabled }} + read_replica_label_key: {{ $agent.readreplica.label_key }} {{- end }} diff --git a/cmd/index/job/readreplica/rotate/sample.yaml b/cmd/index/job/readreplica/rotate/sample.yaml index fbdb3dd43a..77166b6452 100644 --- a/cmd/index/job/readreplica/rotate/sample.yaml +++ b/cmd/index/job/readreplica/rotate/sample.yaml @@ -17,7 +17,7 @@ version: v0.0.0 time_zone: JST logging: format: raw - level: info + level: debug logger: glg server_config: servers: diff --git a/cmd/index/operator/sample.yaml b/cmd/index/operator/sample.yaml index 9eb0ea6d34..be13b3df90 100644 --- a/cmd/index/operator/sample.yaml +++ b/cmd/index/operator/sample.yaml @@ -72,6 +72,8 @@ operator: agent_name: "vald-agent" agent_namespace: "default" concurrency: 1 + read_replica_enabled: true + read_replica_label_key: "vald-readreplica-id" observability: enabled: false otlp: diff --git a/internal/config/index_operator.go b/internal/config/index_operator.go index 053941603b..59ccc8e389 100644 --- a/internal/config/index_operator.go +++ b/internal/config/index_operator.go @@ -13,7 +13,7 @@ // limitations under the License. package config -// IndexCreation represents the configurations for index creation. +// IndexOperator represents the configurations for index k8s operator. type IndexOperator struct { // AgentName represent agents meta_name for service discovery AgentName string `json:"agent_name" yaml:"agent_name"` @@ -23,6 +23,12 @@ type IndexOperator struct { // Concurrency represents indexing concurrency. Concurrency int `json:"concurrency" yaml:"concurrency"` + + // ReadReplicaEnabled represents whether read replica is enabled or not. + ReadReplicaEnabled bool `json:"read_replica_enabled" yaml:"read_replica_enabled"` + + // ReadReplicaLabelKey represents the label key for read replica. + ReadReplicaLabelKey string `json:"read_replica_label_key" yaml:"read_replica_label_key"` } func (ic *IndexOperator) Bind() *IndexOperator { diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 4235a304d9..0798bca32e 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -63,6 +63,7 @@ const ( WatchDeletedEvent = watch.Deleted SelectionOpEquals = selection.Equals SelectionOpExists = selection.Exists + PodIndexLabel = appsv1.PodIndexLabel ) var ( @@ -99,6 +100,9 @@ type Client interface { // Watch watches the given obj for changes and takes the appropriate callbacks. Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error) + // MatchingLabels filters the list/delete operation on the given set of labels. + MatchingLabels(labels map[string]string) cli.MatchingLabels + // LabelSelector creates labels.Selector for Options like ListOptions. LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) } @@ -173,6 +177,10 @@ func (c *client) Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOpti return c.withWatch.Watch(ctx, obj, opts...) } +func (*client) MatchingLabels(labels map[string]string) cli.MatchingLabels { + return cli.MatchingLabels(labels) +} + func (*client) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) { requirements, err := labels.NewRequirement(key, op, vals) if err != nil { @@ -251,12 +259,8 @@ func (s *patcher) ApplyPodAnnotations(ctx context.Context, name, namespace strin } patch := &unstructured.Unstructured{Object: obj} - if err := s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{ + return s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{ FieldManager: s.fieldManager, Force: ptr.To(true), - }); err != nil { - return err - } - - return nil + }) } diff --git a/internal/k8s/pod/pod.go b/internal/k8s/pod/pod.go index 4828a2a47c..1229f1620c 100644 --- a/internal/k8s/pod/pod.go +++ b/internal/k8s/pod/pod.go @@ -53,6 +53,7 @@ type Pod struct { CPURequest float64 MemLimit float64 MemRequest float64 + Labels map[string]string Annotations map[string]string } @@ -110,6 +111,7 @@ func (r *reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (res re pods = make(map[string][]Pod, len(ps.Items)) ) + // skipcq: CRT-P0006 for _, pod := range ps.Items { if pod.GetObjectMeta().GetDeletionTimestamp() != nil || (r.namespace != "" && !strings.EqualFold(pod.GetNamespace(), r.namespace)) || @@ -151,6 +153,7 @@ func (r *reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (res re CPURequest: cpuRequest, MemLimit: memLimit, MemRequest: memRequest, + Labels: pod.GetLabels(), Annotations: pod.GetAnnotations(), }) } diff --git a/internal/k8s/vald/annotations.go b/internal/k8s/vald/annotations.go new file mode 100644 index 0000000000..24ef49e6ce --- /dev/null +++ b/internal/k8s/vald/annotations.go @@ -0,0 +1,29 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package vald + +import "time" + +const ( + TimeFormat = time.RFC3339Nano + UncommittedAnnotationsKey = "vald.vdaas.org/uncommitted" + UnsavedProcessedVQAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq" + UnsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution" + LastTimeSaveIndexTimestampAnnotationsKey = "vald.vdaas.org/last-time-save-index-timestamp" + IndexCountAnnotationsKey = "vald.vdaas.org/index-count" + LastTimeSnapshotTimestampAnnotationsKey = "vald.vdaas.org/last-time-snapshot-timestamp" +) diff --git a/internal/test/mock/k8s/client.go b/internal/test/mock/k8s/client.go index 1d51944043..a40828d8ec 100644 --- a/internal/test/mock/k8s/client.go +++ b/internal/test/mock/k8s/client.go @@ -30,7 +30,7 @@ type ValdK8sClientMock struct { var _ client.Client = (*ValdK8sClientMock)(nil) -func (m *ValdK8sClientMock) Get(ctx context.Context, name string, namespace string, obj client.Object, opts ...crclient.GetOption) error { +func (m *ValdK8sClientMock) Get(ctx context.Context, name, namespace string, obj client.Object, opts ...crclient.GetOption) error { args := m.Called(ctx, name, namespace, obj, opts) return args.Error(0) } @@ -65,6 +65,11 @@ func (m *ValdK8sClientMock) Watch(ctx context.Context, obj crclient.ObjectList, return args.Get(0).(watch.Interface), args.Error(1) } +func (m *ValdK8sClientMock) MatchingLabels(labels map[string]string) client.MatchingLabels { + args := m.Called(labels) + return args.Get(0).(client.MatchingLabels) +} + func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) { args := m.Called(key, op, vals) return args.Get(0).(labels.Selector), args.Error(1) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index bba35dec01..f1e1a80902 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -40,6 +40,7 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/k8s/vald" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" @@ -2006,7 +2007,7 @@ func (n *ngt) unsavedNumberOfCreateIndexExecutionEntry() (k, v string) { } func (n *ngt) lastTimeSaveIndexTimestampEntry(timestamp time.Time) (k, v string) { - return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(time.RFC3339) + return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(vald.TimeFormat) } func (n *ngt) indexCountEntry() (k, v string) { diff --git a/pkg/agent/core/ngt/service/ngt_test.go b/pkg/agent/core/ngt/service/ngt_test.go index 51848539f6..3adcf18726 100644 --- a/pkg/agent/core/ngt/service/ngt_test.go +++ b/pkg/agent/core/ngt/service/ngt_test.go @@ -36,6 +36,7 @@ import ( core "github.com/vdaas/vald/internal/core/algorithm/ngt" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/file" + kvald "github.com/vdaas/vald/internal/k8s/vald" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/strings" @@ -1300,7 +1301,7 @@ func TestExportIndexInfo(t *testing.T) { unsavedProcessedVqAnnotationsKey: "2", } expectedAfterSave := map[string]string{ - lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(time.RFC3339), + lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(kvald.TimeFormat), unsavedCreateIndexExecutionNumAnnotationsKey: "0", unsavedProcessedVqAnnotationsKey: "0", } diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index c478e12965..ec64ac39bc 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -23,6 +23,7 @@ import ( snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/k8s/vald" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" @@ -30,7 +31,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) const ( @@ -160,7 +161,7 @@ func (s *subProcess) rotate(ctx context.Context) error { return err } - err = s.updateDeployment(ctx, newPvc.GetName(), deployment) + err = s.updateDeployment(ctx, newPvc.GetName(), deployment, newSnap.CreationTimestamp.Time) if err != nil { log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName()) if dperr := s.deletePVC(ctx, newPvc); dperr != nil { @@ -211,7 +212,7 @@ func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Depl Kind: "Deployment", Name: deployment.GetName(), UID: deployment.GetUID(), - Controller: pointer.Bool(true), + Controller: ptr.To(true), }, }, }, @@ -257,7 +258,7 @@ func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployme Kind: "Deployment", Name: deployment.GetName(), UID: deployment.GetUID(), - Controller: pointer.Bool(true), + Controller: ptr.To(true), }, }, }, @@ -295,11 +296,17 @@ func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, err return &list.Items[0], nil } -func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error { +func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment, snapshotTime time.Time) error { if deployment.Spec.Template.ObjectMeta.Annotations == nil { deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{} } - deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().UTC().Format(time.RFC3339) + now := time.Now().UTC().Format(time.RFC3339) + deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = now + + if deployment.Annotations == nil { + deployment.Annotations = map[string]string{} + } + deployment.Annotations[vald.LastTimeSnapshotTimestampAnnotationsKey] = snapshotTime.UTC().Format(vald.TimeFormat) for _, vol := range deployment.Spec.Template.Spec.Volumes { if vol.Name == s.volumeName { diff --git a/pkg/index/operator/service/operator.go b/pkg/index/operator/service/operator.go index 0ae30c55d7..0b8045ba88 100644 --- a/pkg/index/operator/service/operator.go +++ b/pkg/index/operator/service/operator.go @@ -15,12 +15,16 @@ package service import ( "context" + "fmt" "reflect" + "time" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s" + "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/k8s/job" "github.com/vdaas/vald/internal/k8s/pod" + "github.com/vdaas/vald/internal/k8s/vald" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" @@ -37,9 +41,12 @@ type Operator interface { } type operator struct { - ctrl k8s.Controller - eg errgroup.Group - namespace string + ctrl k8s.Controller + eg errgroup.Group + namespace string + client client.Client + readReplicaEnabled bool + readReplicaLabelKey string } // New returns Indexer object if no error occurs. @@ -88,6 +95,13 @@ func New(agentName string, opts ...Option) (o Operator, err error) { if err != nil { return nil, err } + + client, err := client.New() + if err != nil { + return nil, err + } + operator.client = client + return operator, nil } @@ -127,15 +141,79 @@ func (o *operator) podOnReconcile(ctx context.Context, podList map[string][]pod. for k, v := range podList { for _, pod := range v { log.Debug("key", k, "name:", pod.Name, "annotations:", pod.Annotations) + + // rotate read replica if needed + if o.readReplicaEnabled { + if err := o.rotateIfNeeded(ctx, pod); err != nil { + log.Error(err) + } + } } } } // TODO: implement job reconcile logic to detect save job completion and to start rotation. -func (o *operator) jobOnReconcile(ctx context.Context, jobList map[string][]job.Job) { +func (*operator) jobOnReconcile(_ context.Context, jobList map[string][]job.Job) { for k, v := range jobList { + // skipcq: CRT-P0006 for _, job := range v { log.Debug("key", k, "name:", job.Name, "status:", job.Status) } } } + +// rotateIfNeeded starts rotation job when the condition meets. +// This function is work in progress. +func (o *operator) rotateIfNeeded(ctx context.Context, pod pod.Pod) error { + t, ok := pod.Annotations[vald.LastTimeSaveIndexTimestampAnnotationsKey] + if !ok { + log.Info("the agent pod has not saved index yet. skipping...") + return nil + } + lastSavedTime, err := time.Parse(vald.TimeFormat, t) + if err != nil { + return fmt.Errorf("parsing last time saved time: %w", err) + } + + podIdx, ok := pod.Labels[client.PodIndexLabel] + if !ok { + log.Info("no index label found. the agent is not StatefulSet? skipping...") + return nil + } + + var depList client.DeploymentList + selector, err := o.client.LabelSelector(o.readReplicaLabelKey, client.SelectionOpEquals, []string{podIdx}) + if err != nil { + return fmt.Errorf("creating label selector: %w", err) + } + listOpts := client.ListOptions{ + Namespace: o.namespace, + LabelSelector: selector, + } + if err := o.client.List(ctx, &depList, &listOpts); err != nil { + return err + } + if len(depList.Items) == 0 { + return errors.New("no readreplica deployment found") + } + dep := depList.Items[0] + + annotations := dep.GetAnnotations() + t, ok = annotations[vald.LastTimeSnapshotTimestampAnnotationsKey] + if ok { + lastSnapshotTime, err := time.Parse(vald.TimeFormat, t) + if err != nil { + return fmt.Errorf("parsing last snapshot time: %w", err) + } + + if lastSnapshotTime.After(lastSavedTime) { + log.Info("snapshot taken after the last save. skipping...") + return nil + } + } + + log.Infof("rotation required for agent id: %s. creating rotator job...", podIdx) + // TODO: check if the rotator job already exists or queued + // then create rotation job + return nil +} diff --git a/pkg/index/operator/service/options.go b/pkg/index/operator/service/options.go index 00f3f298fe..c4e768ba70 100644 --- a/pkg/index/operator/service/options.go +++ b/pkg/index/operator/service/options.go @@ -30,3 +30,17 @@ func WithErrGroup(eg errgroup.Group) Option { return nil } } + +func WithReadReplicaEnabled(enabled bool) Option { + return func(o *operator) error { + o.readReplicaEnabled = enabled + return nil + } +} + +func WithReadReplicaLabelKey(key string) Option { + return func(o *operator) error { + o.readReplicaLabelKey = key + return nil + } +} diff --git a/pkg/index/operator/usecase/operator.go b/pkg/index/operator/usecase/operator.go index 391ccca1e3..c73760d7e5 100644 --- a/pkg/index/operator/usecase/operator.go +++ b/pkg/index/operator/usecase/operator.go @@ -41,7 +41,11 @@ type run struct { // New returns Runner instance. func New(cfg *config.Data) (_ runner.Runner, err error) { eg := errgroup.Get() - operator, err := service.New(cfg.Operator.AgentName) + operator, err := service.New( + cfg.Operator.AgentName, + service.WithReadReplicaEnabled(cfg.Operator.ReadReplicaEnabled), + service.WithReadReplicaLabelKey(cfg.Operator.ReadReplicaLabelKey), + ) if err != nil { return nil, err }