Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added snapshot timestamp annotations to read replica agent #2428

Merged
merged 19 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions charts/vald/templates/index/operator/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ data:
agent_name: {{ $agent.name }}
agent_namespace: {{ $agent.namespace }}
concurrency: 1
read_replica_enabled: {{ $agent.readreplica.enabled }}
read_replica_label_key: {{ $agent.readreplica.label_key }}
{{- end }}
2 changes: 1 addition & 1 deletion cmd/index/job/readreplica/rotate/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ version: v0.0.0
time_zone: JST
logging:
format: raw
level: info
level: debug
logger: glg
server_config:
servers:
Expand Down
2 changes: 2 additions & 0 deletions cmd/index/operator/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ operator:
agent_name: "vald-agent"
agent_namespace: "default"
concurrency: 1
read_replica_enabled: true
read_replica_label_key: "vald-readreplica-id"
observability:
enabled: false
otlp:
Expand Down
8 changes: 7 additions & 1 deletion internal/config/index_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.
package config

// IndexCreation represents the configurations for index creation.
// IndexOperator represents the configurations for index k8s operator.
type IndexOperator struct {
// AgentName represent agents meta_name for service discovery
AgentName string `json:"agent_name" yaml:"agent_name"`
Expand All @@ -23,6 +23,12 @@ type IndexOperator struct {

// Concurrency represents indexing concurrency.
Concurrency int `json:"concurrency" yaml:"concurrency"`

// ReadReplicaEnabled represents whether read replica is enabled or not.
ReadReplicaEnabled bool `json:"read_replica_enabled" yaml:"read_replica_enabled"`

// ReadReplicaLabelKey represents the label key for read replica.
ReadReplicaLabelKey string `json:"read_replica_label_key" yaml:"read_replica_label_key"`
}

func (ic *IndexOperator) Bind() *IndexOperator {
Expand Down
16 changes: 10 additions & 6 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
WatchDeletedEvent = watch.Deleted
SelectionOpEquals = selection.Equals
SelectionOpExists = selection.Exists
PodIndexLabel = appsv1.PodIndexLabel
)

var (
Expand Down Expand Up @@ -99,6 +100,9 @@
// Watch watches the given obj for changes and takes the appropriate callbacks.
Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error)

// MatchingLabels filters the list/delete operation on the given set of labels.
MatchingLabels(labels map[string]string) cli.MatchingLabels

// LabelSelector creates labels.Selector for Options like ListOptions.
LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error)
}
Expand Down Expand Up @@ -173,6 +177,10 @@
return c.withWatch.Watch(ctx, obj, opts...)
}

func (*client) MatchingLabels(labels map[string]string) cli.MatchingLabels {
return cli.MatchingLabels(labels)

Check warning on line 181 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L180-L181

Added lines #L180 - L181 were not covered by tests
}

func (*client) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) {
requirements, err := labels.NewRequirement(key, op, vals)
if err != nil {
Expand Down Expand Up @@ -251,12 +259,8 @@
}

patch := &unstructured.Unstructured{Object: obj}
if err := s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{
return s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{

Check warning on line 262 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L262

Added line #L262 was not covered by tests
FieldManager: s.fieldManager,
Force: ptr.To(true),
}); err != nil {
return err
}

return nil
})

Check warning on line 265 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L265

Added line #L265 was not covered by tests
}
3 changes: 3 additions & 0 deletions internal/k8s/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
CPURequest float64
MemLimit float64
MemRequest float64
Labels map[string]string
Annotations map[string]string
}

Expand Down Expand Up @@ -110,6 +111,7 @@
pods = make(map[string][]Pod, len(ps.Items))
)

// skipcq: CRT-P0006

Check warning on line 114 in internal/k8s/pod/pod.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/pod/pod.go#L114

Added line #L114 was not covered by tests
for _, pod := range ps.Items {
if pod.GetObjectMeta().GetDeletionTimestamp() != nil ||
(r.namespace != "" && !strings.EqualFold(pod.GetNamespace(), r.namespace)) ||
Expand Down Expand Up @@ -151,6 +153,7 @@
CPURequest: cpuRequest,
MemLimit: memLimit,
MemRequest: memRequest,
Labels: pod.GetLabels(),

Check warning on line 156 in internal/k8s/pod/pod.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/pod/pod.go#L156

Added line #L156 was not covered by tests
Annotations: pod.GetAnnotations(),
})
}
Expand Down
29 changes: 29 additions & 0 deletions internal/k8s/vald/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package vald
ykadowak marked this conversation as resolved.
Show resolved Hide resolved

import "time"

const (
TimeFormat = time.RFC3339Nano
UncommittedAnnotationsKey = "vald.vdaas.org/uncommitted"
UnsavedProcessedVQAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq"
UnsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution"
LastTimeSaveIndexTimestampAnnotationsKey = "vald.vdaas.org/last-time-save-index-timestamp"
IndexCountAnnotationsKey = "vald.vdaas.org/index-count"
LastTimeSnapshotTimestampAnnotationsKey = "vald.vdaas.org/last-time-snapshot-timestamp"
)
7 changes: 6 additions & 1 deletion internal/test/mock/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

var _ client.Client = (*ValdK8sClientMock)(nil)

func (m *ValdK8sClientMock) Get(ctx context.Context, name string, namespace string, obj client.Object, opts ...crclient.GetOption) error {
func (m *ValdK8sClientMock) Get(ctx context.Context, name, namespace string, obj client.Object, opts ...crclient.GetOption) error {

Check warning on line 33 in internal/test/mock/k8s/client.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/k8s/client.go#L33

Added line #L33 was not covered by tests
args := m.Called(ctx, name, namespace, obj, opts)
return args.Error(0)
}
Expand Down Expand Up @@ -65,6 +65,11 @@
return args.Get(0).(watch.Interface), args.Error(1)
}

func (m *ValdK8sClientMock) MatchingLabels(labels map[string]string) client.MatchingLabels {
args := m.Called(labels)
return args.Get(0).(client.MatchingLabels)

Check warning on line 70 in internal/test/mock/k8s/client.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/k8s/client.go#L68-L70

Added lines #L68 - L70 were not covered by tests
}

func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) {
args := m.Called(key, op, vals)
return args.Get(0).(labels.Selector), args.Error(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
Expand Down Expand Up @@ -2006,7 +2007,7 @@ func (n *ngt) unsavedNumberOfCreateIndexExecutionEntry() (k, v string) {
}

func (n *ngt) lastTimeSaveIndexTimestampEntry(timestamp time.Time) (k, v string) {
return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(time.RFC3339)
return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(vald.TimeFormat)
}

func (n *ngt) indexCountEntry() (k, v string) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/core/ngt/service/ngt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
core "github.com/vdaas/vald/internal/core/algorithm/ngt"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
kvald "github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/strings"
Expand Down Expand Up @@ -1300,7 +1301,7 @@ func TestExportIndexInfo(t *testing.T) {
unsavedProcessedVqAnnotationsKey: "2",
}
expectedAfterSave := map[string]string{
lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(time.RFC3339),
lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(kvald.TimeFormat),
unsavedCreateIndexExecutionNumAnnotationsKey: "0",
unsavedProcessedVqAnnotationsKey: "0",
}
Expand Down
19 changes: 13 additions & 6 deletions pkg/index/job/readreplica/rotate/service/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)

const (
Expand Down Expand Up @@ -160,7 +161,7 @@
return err
}

err = s.updateDeployment(ctx, newPvc.GetName(), deployment)
err = s.updateDeployment(ctx, newPvc.GetName(), deployment, newSnap.CreationTimestamp.Time)

Check warning on line 164 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L164

Added line #L164 was not covered by tests
if err != nil {
log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName())
if dperr := s.deletePVC(ctx, newPvc); dperr != nil {
Expand Down Expand Up @@ -211,7 +212,7 @@
Kind: "Deployment",
Name: deployment.GetName(),
UID: deployment.GetUID(),
Controller: pointer.Bool(true),
Controller: ptr.To(true),

Check warning on line 215 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L215

Added line #L215 was not covered by tests
},
},
},
Expand Down Expand Up @@ -257,7 +258,7 @@
Kind: "Deployment",
Name: deployment.GetName(),
UID: deployment.GetUID(),
Controller: pointer.Bool(true),
Controller: ptr.To(true),

Check warning on line 261 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L261

Added line #L261 was not covered by tests
},
},
},
Expand Down Expand Up @@ -295,11 +296,17 @@
return &list.Items[0], nil
}

func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error {
func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment, snapshotTime time.Time) error {

Check warning on line 299 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L299

Added line #L299 was not covered by tests
if deployment.Spec.Template.ObjectMeta.Annotations == nil {
deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{}
}
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().UTC().Format(time.RFC3339)
now := time.Now().UTC().Format(time.RFC3339)
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = now

if deployment.Annotations == nil {
deployment.Annotations = map[string]string{}
}
deployment.Annotations[vald.LastTimeSnapshotTimestampAnnotationsKey] = snapshotTime.UTC().Format(vald.TimeFormat)

Check warning on line 309 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L303-L309

Added lines #L303 - L309 were not covered by tests

for _, vol := range deployment.Spec.Template.Spec.Volumes {
if vol.Name == s.volumeName {
Expand Down
86 changes: 82 additions & 4 deletions pkg/index/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

import (
"context"
"fmt"
"reflect"
"time"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/k8s"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/job"
"github.com/vdaas/vald/internal/k8s/pod"
"github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
Expand All @@ -37,9 +41,12 @@
}

type operator struct {
ctrl k8s.Controller
eg errgroup.Group
namespace string
ctrl k8s.Controller
eg errgroup.Group
namespace string
client client.Client
readReplicaEnabled bool
readReplicaLabelKey string
}

// New returns Indexer object if no error occurs.
Expand Down Expand Up @@ -88,6 +95,13 @@
if err != nil {
return nil, err
}

client, err := client.New()
if err != nil {
return nil, err
}
operator.client = client

Check warning on line 104 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L99-L104

Added lines #L99 - L104 were not covered by tests
return operator, nil
}

Expand Down Expand Up @@ -127,15 +141,79 @@
for k, v := range podList {
for _, pod := range v {
log.Debug("key", k, "name:", pod.Name, "annotations:", pod.Annotations)

// rotate read replica if needed
if o.readReplicaEnabled {
if err := o.rotateIfNeeded(ctx, pod); err != nil {
log.Error(err)
}

Check warning on line 149 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L144-L149

Added lines #L144 - L149 were not covered by tests
}
}
}
}

// TODO: implement job reconcile logic to detect save job completion and to start rotation.
func (o *operator) jobOnReconcile(ctx context.Context, jobList map[string][]job.Job) {
func (*operator) jobOnReconcile(_ context.Context, jobList map[string][]job.Job) {

Check warning on line 156 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L156

Added line #L156 was not covered by tests
for k, v := range jobList {
// skipcq: CRT-P0006

Check warning on line 158 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L158

Added line #L158 was not covered by tests
for _, job := range v {
log.Debug("key", k, "name:", job.Name, "status:", job.Status)
}
}
}

// rotateIfNeeded starts rotation job when the condition meets.
// This function is work in progress.
func (o *operator) rotateIfNeeded(ctx context.Context, pod pod.Pod) error {
t, ok := pod.Annotations[vald.LastTimeSaveIndexTimestampAnnotationsKey]
if !ok {
log.Info("the agent pod has not saved index yet. skipping...")
return nil
}
lastSavedTime, err := time.Parse(vald.TimeFormat, t)
if err != nil {
return fmt.Errorf("parsing last time saved time: %w", err)
}

Check warning on line 176 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L167-L176

Added lines #L167 - L176 were not covered by tests

podIdx, ok := pod.Labels[client.PodIndexLabel]
if !ok {
log.Info("no index label found. the agent is not StatefulSet? skipping...")
return nil
}

Check warning on line 182 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L178-L182

Added lines #L178 - L182 were not covered by tests

var depList client.DeploymentList
selector, err := o.client.LabelSelector(o.readReplicaLabelKey, client.SelectionOpEquals, []string{podIdx})
if err != nil {
return fmt.Errorf("creating label selector: %w", err)
}
listOpts := client.ListOptions{
Namespace: o.namespace,
LabelSelector: selector,
}
if err := o.client.List(ctx, &depList, &listOpts); err != nil {
return err
}
if len(depList.Items) == 0 {
return errors.New("no readreplica deployment found")
}
dep := depList.Items[0]

annotations := dep.GetAnnotations()
t, ok = annotations[vald.LastTimeSnapshotTimestampAnnotationsKey]
if ok {
lastSnapshotTime, err := time.Parse(vald.TimeFormat, t)
if err != nil {
return fmt.Errorf("parsing last snapshot time: %w", err)
}

Check warning on line 207 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L184-L207

Added lines #L184 - L207 were not covered by tests

if lastSnapshotTime.After(lastSavedTime) {
log.Info("snapshot taken after the last save. skipping...")
return nil
}

Check warning on line 212 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L209-L212

Added lines #L209 - L212 were not covered by tests
}

log.Infof("rotation required for agent id: %s. creating rotator job...", podIdx)
// TODO: check if the rotator job already exists or queued
// then create rotation job
return nil

Check warning on line 218 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L215-L218

Added lines #L215 - L218 were not covered by tests
}
Loading
Loading