From 535f7b2d7e12f8cf662b24f971a756bced044b7e Mon Sep 17 00:00:00 2001 From: Yusuke Kadowaki Date: Wed, 21 Feb 2024 10:43:37 +0900 Subject: [PATCH] Add rotate-all option to rotator (#2305) * Add rotate-all option to rotator * Fix linter warnings * Make it possible to choose multiple rotation target * Add tests for parseReplicaID * Add tests for parseReplicaID * Hide testify in internal * style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in dd84674 according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2305 * Apply format * Lint * Lint --------- Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> Co-authored-by: Yusuke Kato --- internal/errors/rotator.go | 21 +++ internal/k8s/client/client.go | 6 + internal/test/mock/k8s/client.go | 71 ++++++++ internal/test/testify/testify.go | 26 +++ pkg/agent/core/faiss/service/option.go | 2 +- .../job/readreplica/rotate/service/rotator.go | 166 ++++++++++++------ .../rotate/service/rotator_test.go | 108 ++++++++++++ 7 files changed, 347 insertions(+), 53 deletions(-) create mode 100644 internal/errors/rotator.go create mode 100644 internal/test/mock/k8s/client.go create mode 100644 internal/test/testify/testify.go diff --git a/internal/errors/rotator.go b/internal/errors/rotator.go new file mode 100644 index 0000000000..ebf498f975 --- /dev/null +++ b/internal/errors/rotator.go @@ -0,0 +1,21 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Package errors provides error types and function +package errors + +// ErrReadReplicaIDEmpty represents error when trying to rotate agents with empty replicaID. +var ErrReadReplicaIDEmpty = New("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index bac11df061..1efcdf1915 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -23,6 +23,7 @@ import ( snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/vdaas/vald/internal/errors" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,17 +53,22 @@ type ( MatchingLabels = cli.MatchingLabels InNamespace = cli.InNamespace VolumeSnapshot = snapshotv1.VolumeSnapshot + Deployment = appsv1.Deployment + DeploymentList = appsv1.DeploymentList + ObjectMeta = metav1.ObjectMeta ) const ( DeletePropagationBackground = metav1.DeletePropagationBackground WatchDeletedEvent = watch.Deleted SelectionOpEquals = selection.Equals + SelectionOpExists = selection.Exists ) var ( ServerSideApply = cli.Apply MergePatch = cli.Merge + NewSelector = labels.NewSelector ) type Client interface { diff --git a/internal/test/mock/k8s/client.go b/internal/test/mock/k8s/client.go new file mode 100644 index 0000000000..663b4b7f73 --- /dev/null +++ b/internal/test/mock/k8s/client.go @@ -0,0 +1,71 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package k8s + +import ( + "context" + + "github.com/stretchr/testify/mock" + "github.com/vdaas/vald/internal/k8s/client" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/watch" + crclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ValdK8sClientMock struct { + mock.Mock +} + +var _ client.Client = (*ValdK8sClientMock)(nil) + +func (m *ValdK8sClientMock) Get(ctx context.Context, name string, namespace string, obj client.Object, opts ...crclient.GetOption) error { + args := m.Called(ctx, name, namespace, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) List(ctx context.Context, list crclient.ObjectList, opts ...client.ListOption) error { + args := m.Called(ctx, list, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Delete(ctx context.Context, obj client.Object, opts ...crclient.DeleteOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Update(ctx context.Context, obj client.Object, opts ...crclient.UpdateOption) error { + args := m.Called(ctx, obj, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Patch(ctx context.Context, obj client.Object, patch crclient.Patch, opts ...crclient.PatchOption) error { + args := m.Called(ctx, obj, patch, opts) + return args.Error(0) +} + +func (m *ValdK8sClientMock) Watch(ctx context.Context, obj crclient.ObjectList, opts ...client.ListOption) (watch.Interface, error) { + args := m.Called(ctx, obj, opts) + return args.Get(0).(watch.Interface), args.Error(1) +} + +func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) { + args := m.Called(key, op, vals) + return args.Get(0).(labels.Selector), args.Error(1) +} diff --git a/internal/test/testify/testify.go b/internal/test/testify/testify.go new file mode 100644 index 0000000000..49a7a6193b --- /dev/null +++ b/internal/test/testify/testify.go @@ -0,0 +1,26 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package testify + +import ( + "github.com/stretchr/testify/mock" +) + +type ( + Arguments = mock.Arguments +) + +const ( + Anything = mock.Anything +) diff --git a/pkg/agent/core/faiss/service/option.go b/pkg/agent/core/faiss/service/option.go index 6f06556c35..7cc465683a 100644 --- a/pkg/agent/core/faiss/service/option.go +++ b/pkg/agent/core/faiss/service/option.go @@ -29,7 +29,7 @@ import ( "github.com/vdaas/vald/internal/timeutil" ) -// Option represent the functional option for faiss +// Option represent the functional option for faiss. type Option func(f *faiss) error var defaultOptions = []Option{ diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index 6ce4272d73..c478e12965 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -25,6 +25,7 @@ import ( "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" + "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync/errgroup" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -33,7 +34,8 @@ import ( ) const ( - apiName = "vald/index/job/readreplica/rotate" + apiName = "vald/index/job/readreplica/rotate" + rotateAllID = "rotate-all" ) // Rotator represents an interface for indexing. @@ -45,20 +47,20 @@ type rotator struct { namespace string volumeName string readReplicaLabelKey string - readReplicaID string - client client.Client - listOpts client.ListOptions + subProcesses []subProcess +} + +type subProcess struct { + listOpts client.ListOptions + client client.Client + volumeName string } // New returns Indexer object if no error occurs. +// replicaID must be a comma separated string of replica id or ${rotateAllID} to rotate all read replica at once. func New(replicaID string, opts ...Option) (Rotator, error) { r := new(rotator) - if replicaID == "" { - return nil, fmt.Errorf("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var") - } - r.readReplicaID = replicaID - for _, opt := range append(defaultOpts, opts...) { if err := opt(r); err != nil { oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) @@ -75,17 +77,20 @@ func New(replicaID string, opts ...Option) (Rotator, error) { if err != nil { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } - r.client = c - selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{r.readReplicaID}) + ids, err := r.parseReplicaID(replicaID, c) if err != nil { return nil, err } - r.listOpts = client.ListOptions{ - Namespace: r.namespace, - LabelSelector: selector, - } + for _, id := range ids { + sub, err := r.newSubprocess(c, id) + if err != nil { + return nil, fmt.Errorf("failed to create rotator subprocess: %w", err) + } + + r.subProcesses = append(r.subProcesses, sub) + } return r, nil } @@ -98,58 +103,81 @@ func (r *rotator) Start(ctx context.Context) error { } }() - if err := r.rotate(ctx); err != nil { - if span != nil { - span.RecordError(err) - span.SetStatus(trace.StatusError, err.Error()) - } - return err + eg, ectx := errgroup.New(ctx) + for _, sub := range r.subProcesses { + s := sub + eg.Go(safety.RecoverFunc(func() (err error) { + if err := s.rotate(ectx); err != nil { + if span != nil { + span.RecordError(err) + span.SetStatus(trace.StatusError, err.Error()) + } + return err + } + return nil + })) } - return nil + return eg.Wait() } -func (r *rotator) rotate(ctx context.Context) error { +func (r *rotator) newSubprocess(c client.Client, replicaID string) (subProcess, error) { + selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{replicaID}) + if err != nil { + return subProcess{}, err + } + sub := subProcess{ + client: c, + listOpts: client.ListOptions{ + Namespace: r.namespace, + LabelSelector: selector, + }, + volumeName: r.volumeName, + } + return sub, nil +} + +func (s *subProcess) rotate(ctx context.Context) error { // get deployment here to pass to create methods of snapshot and pvc // and put it as owner reference of them so that they will be deleted when the deployment is deleted - deployment, err := r.getDeployment(ctx) + deployment, err := s.getDeployment(ctx) if err != nil { log.Errorf("failed to get Deployment.") return err } - newSnap, oldSnap, err := r.createSnapshot(ctx, deployment) + newSnap, oldSnap, err := s.createSnapshot(ctx, deployment) if err != nil { return err } - newPvc, oldPvc, err := r.createPVC(ctx, newSnap.GetName(), deployment) + newPvc, oldPvc, err := s.createPVC(ctx, newSnap.GetName(), deployment) if err != nil { log.Errorf("failed to create PVC. removing the new snapshot(%s)...", newSnap.GetName()) - if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil { + if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil { errors.Join(err, dserr) } return err } - err = r.updateDeployment(ctx, newPvc.GetName(), deployment) + err = s.updateDeployment(ctx, newPvc.GetName(), deployment) if err != nil { log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName()) - if dperr := r.deletePVC(ctx, newPvc); dperr != nil { + if dperr := s.deletePVC(ctx, newPvc); dperr != nil { errors.Join(err, dperr) } - if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil { + if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil { errors.Join(err, dserr) } return err } - err = r.deleteSnapshot(ctx, oldSnap) + err = s.deleteSnapshot(ctx, oldSnap) if err != nil { return err } - err = r.deletePVC(ctx, oldPvc) + err = s.deletePVC(ctx, oldPvc) if err != nil { return err } @@ -157,9 +185,9 @@ func (r *rotator) rotate(ctx context.Context) error { return nil } -func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) { +func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) { list := snapshotv1.VolumeSnapshotList{} - if err := r.client.List(ctx, &list, &r.listOpts); err != nil { + if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, nil, fmt.Errorf("failed to get snapshot: %w", err) } if len(list.Items) == 0 { @@ -193,7 +221,7 @@ func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deploym log.Infof("creating new snapshot(%s)...", newSnap.GetName()) log.Debugf("snapshot detail: %#v", newSnap) - err = r.client.Create(ctx, newSnap) + err = s.client.Create(ctx, newSnap) if err != nil { return nil, nil, fmt.Errorf("failed to create snapshot: %w", err) } @@ -201,9 +229,9 @@ func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deploym return newSnap, oldSnap, nil } -func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) { +func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) { list := v1.PersistentVolumeClaimList{} - if err := r.client.List(ctx, &list, &r.listOpts); err != nil { + if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, nil, fmt.Errorf("failed to get PVC: %w", err) } if len(list.Items) == 0 { @@ -248,33 +276,33 @@ func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment log.Infof("creating new pvc(%s)...", newPvc.GetName()) log.Debugf("pvc detail: %#v", newPvc) - if err := r.client.Create(ctx, newPvc); err != nil { + if err := s.client.Create(ctx, newPvc); err != nil { return nil, nil, fmt.Errorf("failed to create PVC(%s): %w", newPvc.GetName(), err) } return newPvc, oldPvc, nil } -func (r *rotator) getDeployment(ctx context.Context) (*appsv1.Deployment, error) { +func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, error) { list := appsv1.DeploymentList{} - if err := r.client.List(ctx, &list, &r.listOpts); err != nil { + if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, fmt.Errorf("failed to get deployment through client: %w", err) } if len(list.Items) == 0 { - return nil, fmt.Errorf("no deployment found") + return nil, fmt.Errorf("no deployment found with the label(%s)", s.listOpts.LabelSelector) } return &list.Items[0], nil } -func (r *rotator) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error { +func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error { if deployment.Spec.Template.ObjectMeta.Annotations == nil { deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{} } deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().UTC().Format(time.RFC3339) for _, vol := range deployment.Spec.Template.Spec.Volumes { - if vol.Name == r.volumeName { + if vol.Name == s.volumeName { vol.PersistentVolumeClaim.ClaimName = newPVC } } @@ -282,19 +310,19 @@ func (r *rotator) updateDeployment(ctx context.Context, newPVC string, deploymen log.Infof("updating deployment(%s)...", deployment.GetName()) log.Debugf("deployment detail: %#v", deployment) - if err := r.client.Update(ctx, deployment); err != nil { + if err := s.client.Update(ctx, deployment); err != nil { return fmt.Errorf("failed to update deployment: %w", err) } return nil } -func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error { - watcher, err := r.client.Watch(ctx, +func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error { + watcher, err := s.client.Watch(ctx, &snapshotv1.VolumeSnapshotList{ Items: []snapshotv1.VolumeSnapshot{*snapshot}, }, - &r.listOpts, + &s.listOpts, ) if err != nil { return fmt.Errorf("failed to watch snapshot(%s): %w", snapshot.GetName(), err) @@ -320,18 +348,18 @@ func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.Volum } }) - if err := r.client.Delete(ctx, snapshot); err != nil { + if err := s.client.Delete(ctx, snapshot); err != nil { return fmt.Errorf("failed to delete snapshot: %w", err) } return eg.Wait() } -func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error { - watcher, err := r.client.Watch(ctx, +func (s *subProcess) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error { + watcher, err := s.client.Watch(ctx, &v1.PersistentVolumeClaimList{ Items: []v1.PersistentVolumeClaim{*pvc}, }, - &r.listOpts, + &s.listOpts, ) if err != nil { return fmt.Errorf("failed to watch PVC: %w", err) @@ -357,7 +385,7 @@ func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) } }) - if err := r.client.Delete(ctx, pvc); err != nil { + if err := s.client.Delete(ctx, pvc); err != nil { return fmt.Errorf("failed to delete PVC(%s): %w", pvc.GetName(), err) } @@ -375,3 +403,37 @@ func getNewBaseName(old string) string { } return newNameBase } + +func (r *rotator) parseReplicaID(replicaID string, c client.Client) ([]string, error) { + if replicaID == "" { + return nil, errors.ErrReadReplicaIDEmpty + } + + if replicaID == rotateAllID { + var deploymentList appsv1.DeploymentList + selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{}) + if err != nil { + return nil, err + } + if err := c.List(context.Background(), &deploymentList, &client.ListOptions{ + Namespace: r.namespace, + LabelSelector: selector, + }); err != nil { + return nil, fmt.Errorf("failed to List deployments in parseReplicaID: %w", err) + } + + deployments := deploymentList.Items + if len(deployments) == 0 { + return nil, fmt.Errorf("no read replica found to rotate") + } + + var ids []string + for i := range deployments { + deployment := &deployments[i] + ids = append(ids, deployment.Labels[r.readReplicaLabelKey]) + } + return ids, nil + } + + return strings.Split(replicaID, ","), nil +} diff --git a/pkg/index/job/readreplica/rotate/service/rotator_test.go b/pkg/index/job/readreplica/rotate/service/rotator_test.go index 3a975ae0f2..4369a884fc 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator_test.go +++ b/pkg/index/job/readreplica/rotate/service/rotator_test.go @@ -15,6 +15,12 @@ package service import ( "testing" + + "github.com/stretchr/testify/require" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/test/mock/k8s" + "github.com/vdaas/vald/internal/test/testify" ) func Test_getNewBaseName(t *testing.T) { @@ -77,6 +83,108 @@ func Test_getNewBaseName(t *testing.T) { } } +func Test_parseReplicaID(t *testing.T) { + labelKey := "foo" + type args struct { + replicaID string + c client.Client + } + type want struct { + ids []string + err error + } + type test struct { + name string + args args + want want + } + tests := []test{ + { + name: "single replicaID", + args: args{ + replicaID: "0", + c: nil, + }, + want: want{ + ids: []string{"0"}, + err: nil, + }, + }, + { + name: "multiple replicaIDs", + args: args{ + replicaID: "0,1", + c: nil, + }, + want: want{ + ids: []string{"0", "1"}, + err: nil, + }, + }, + { + name: "returns error when replicaID is empty", + args: args{ + replicaID: "", + c: nil, + }, + want: want{ + ids: nil, + err: errors.ErrReadReplicaIDEmpty, + }, + }, + func() test { + wantID1 := "bar" + wantID2 := "baz" + mock := &k8s.ValdK8sClientMock{} + + mock.On("LabelSelector", testify.Anything, testify.Anything, testify.Anything).Return(client.NewSelector(), nil) + mock.On("List", testify.Anything, testify.Anything, testify.Anything).Run(func(args testify.Arguments) { + if depList, ok := args.Get(1).(*client.DeploymentList); ok { + depList.Items = []client.Deployment{ + { + ObjectMeta: client.ObjectMeta{ + Labels: map[string]string{ + labelKey: wantID1, + }, + }, + }, + { + ObjectMeta: client.ObjectMeta{ + Labels: map[string]string{ + labelKey: wantID2, + }, + }, + }, + } + } + }).Return(nil) + return test{ + name: "returns all ids when rotate-all option is set", + args: args{ + replicaID: rotateAllID, + c: mock, + }, + want: want{ + ids: []string{wantID1, wantID2}, + err: nil, + }, + } + }(), + } + for _, test := range tests { + tt := test + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + r := &rotator{ + readReplicaLabelKey: labelKey, + } + ids, err := r.parseReplicaID(tt.args.replicaID, tt.args.c) + require.Equal(t, tt.want.ids, ids) + require.Equal(t, tt.want.err, err) + }) + } +} + // NOT IMPLEMENTED BELOW // // func TestNew(t *testing.T) {