From b12df50d4fd9c1974985125aff106f8bcec75ab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Thu, 9 Jan 2025 14:42:34 +0800 Subject: [PATCH] Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue Fixes #8587 Signed-off-by: Wenkai Yin(尹文开) --- changelogs/unreleased/8596-ywk253100 | 1 + pkg/backup/backup.go | 48 ++++++--------- pkg/backup/backup_test.go | 20 +++++- pkg/podvolume/backupper.go | 62 +++++++++++++++++-- pkg/podvolume/backupper_factory.go | 6 +- pkg/podvolume/backupper_test.go | 91 +++++++++++++++++++++++++++- 6 files changed, 187 insertions(+), 41 deletions(-) create mode 100644 changelogs/unreleased/8596-ywk253100 diff --git a/changelogs/unreleased/8596-ywk253100 b/changelogs/unreleased/8596-ywk253100 new file mode 100644 index 0000000000..7c7510e824 --- /dev/null +++ b/changelogs/unreleased/8596-ywk253100 @@ -0,0 +1 @@ +Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue \ No newline at end of file diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index c970e07f6a..7511dd427f 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -34,10 +34,8 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/selection" kubeerrs "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" kbclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -301,7 +299,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( var podVolumeBackupper podvolume.Backupper if kb.podVolumeBackupperFactory != nil { - podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, backupRequest.Backup, kb.uploaderType) + podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType) if err != nil { log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper") return errors.WithStack(err) @@ -729,6 +727,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite log := itemBlock.Log defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done() + // the post hooks will not execute until all PVBs of the item block pods are processed if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil { log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock") return @@ -742,36 +741,23 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite } } +// wait all PVBs of the item block pods to be processed func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error { - requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)}) - if err != nil { - return errors.Wrapf(err, "failed to create label requirement") - } - options := &kbclient.ListOptions{ - LabelSelector: labels.NewSelector().Add(*requirement), - } - pvbList := &velerov1api.PodVolumeBackupList{} - if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil { - return errors.Wrap(err, "failed to list PVBs") - } - - podMap := map[string]struct{}{} - for _, pod := range pods { - podMap[string(pod.Item.GetUID())] = struct{}{} - } - pvbMap := map[*velerov1api.PodVolumeBackup]bool{} - for i, pvb := range pvbList.Items { - if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist { - continue + for _, pod := range pods { + namespace, name := pod.Item.GetNamespace(), pod.Item.GetName() + pvbs, err := itemBlock.itemBackupper.podVolumeBackupper.ListPodVolumeBackupsByPod(namespace, name) + if err != nil { + return errors.Wrapf(err, "failed to list PodVolumeBackups for pod %s/%s", namespace, name) } - - processed := false - if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || - pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { - processed = true + for _, pvb := range pvbs { + processed := false + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || + pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + processed = true + } + pvbMap[pvb] = processed } - pvbMap[&pvbList.Items[i]] = processed } checkFunc := func(context.Context) (done bool, err error) { @@ -780,8 +766,8 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l if processed { continue } - updatedPVB := &velerov1api.PodVolumeBackup{} - if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil { + updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackup(pvb.Namespace, pvb.Name) + if err != nil { allProcessed = false log.Infof("failed to get PVB: %v", err) continue diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index d646783bbd..ad4ec64964 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -3913,7 +3913,7 @@ func TestBackupWithHooks(t *testing.T) { type fakePodVolumeBackupperFactory struct{} -func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, *velerov1.Backup, string) (podvolume.Backupper, error) { +func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, logrus.FieldLogger, *velerov1.Backup, string) (podvolume.Backupper, error) { return &fakePodVolumeBackupper{}, nil } @@ -3946,6 +3946,24 @@ func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogg return b.pvbs } +func (b *fakePodVolumeBackupper) GetPodVolumeBackup(namespace, name string) (*velerov1.PodVolumeBackup, error) { + for _, pvb := range b.pvbs { + if pvb.Namespace == namespace && pvb.Name == name { + return pvb, nil + } + } + return nil, nil +} +func (b *fakePodVolumeBackupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1.PodVolumeBackup, error) { + var pvbs []*velerov1.PodVolumeBackup + for _, pvb := range b.pvbs { + if pvb.Spec.Pod.Namespace == podNamespace && pvb.Spec.Pod.Name == podName { + pvbs = append(pvbs, pvb) + } + } + return pvbs, nil +} + // TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup, // and ensures that the pod volume backupper is called, that the returned PodVolumeBackups // are added to the Request object, and that when PVCs are backed up with PodVolume, the diff --git a/pkg/podvolume/backupper.go b/pkg/podvolume/backupper.go index 0a0c63eff1..784c27383a 100644 --- a/pkg/podvolume/backupper.go +++ b/pkg/podvolume/backupper.go @@ -48,6 +48,8 @@ type Backupper interface { // BackupPodVolumes backs up all specified volumes in a pod. BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup + GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) + ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) } type backupper struct { @@ -59,7 +61,10 @@ type backupper struct { pvbInformer ctrlcache.Informer handlerRegistration cache.ResourceEventHandlerRegistration wg sync.WaitGroup - result []*velerov1api.PodVolumeBackup + // pvbIndexer holds all PVBs created by this backuper and is capable to search + // the PVBs based on specific properties quickly because of the embedded indexes. + // The statuses of the PVBs are got updated when Informer receives update events. + pvbIndexer cache.Indexer } type skippedPVC struct { @@ -101,8 +106,22 @@ func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) { } } +const indexNamePod = "POD" + +func podIndexFunc(obj interface{}) ([]string, error) { + pvb, ok := obj.(*velerov1api.PodVolumeBackup) + if !ok { + return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj) + } + if pvb == nil { + return nil, errors.New("PodVolumeBackup is nil") + } + return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil +} + func newBackupper( ctx context.Context, + log logrus.FieldLogger, repoLocker *repository.RepoLocker, repoEnsurer *repository.Ensurer, pvbInformer ctrlcache.Informer, @@ -118,7 +137,9 @@ func newBackupper( uploaderType: uploaderType, pvbInformer: pvbInformer, wg: sync.WaitGroup{}, - result: []*velerov1api.PodVolumeBackup{}, + pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + indexNamePod: podIndexFunc, + }), } b.handlerRegistration, _ = pvbInformer.AddEventHandler( @@ -135,7 +156,10 @@ func newBackupper( return } - b.result = append(b.result, pvb) + // the Indexer inserts PVB directly if the PVB to be updated doesn't exist + if err := b.pvbIndexer.Update(pvb); err != nil { + log.WithError(err).Error("failed to update PVB in indexer") + } b.wg.Done() }, }, @@ -312,6 +336,12 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. continue } b.wg.Add(1) + + if err := b.pvbIndexer.Add(volumeBackup); err != nil { + errs = append(errs, errors.Wrapf(err, "failed to create PodVolumeBackup for pvc %s", pvc.Spec.VolumeName)) + continue + } + podVolumeBackups = append(podVolumeBackups, volumeBackup) pvcSummary.addBackedup(volumeName) } @@ -337,7 +367,8 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero case <-b.ctx.Done(): log.Error("timed out waiting for all PodVolumeBackups to complete") case <-done: - for _, pvb := range b.result { + for _, obj := range b.pvbIndexer.List() { + pvb := obj.(*velerov1api.PodVolumeBackup) podVolumeBackups = append(podVolumeBackups, pvb) if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { log.Errorf("pod volume backup failed: %s", pvb.Status.Message) @@ -347,6 +378,29 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero return podVolumeBackups } +func (b *backupper) GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) { + obj, exist, err := b.pvbIndexer.GetByKey(cache.NewObjectName(namespace, name).String()) + if err != nil { + return nil, err + } + if !exist { + return nil, nil + } + return obj.(*velerov1api.PodVolumeBackup), nil +} + +func (b *backupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) { + objs, err := b.pvbIndexer.ByIndex(indexNamePod, cache.NewObjectName(podNamespace, podName).String()) + if err != nil { + return nil, err + } + var pvbs []*velerov1api.PodVolumeBackup + for _, obj := range objs { + pvbs = append(pvbs, obj.(*velerov1api.PodVolumeBackup)) + } + return pvbs, nil +} + func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) { for _, volumeName := range volumesToBackup { log.WithError(err).Warnf("Skip pod volume %s", volumeName) diff --git a/pkg/podvolume/backupper_factory.go b/pkg/podvolume/backupper_factory.go index fb166f1105..f75f1d30b6 100644 --- a/pkg/podvolume/backupper_factory.go +++ b/pkg/podvolume/backupper_factory.go @@ -32,7 +32,7 @@ import ( // BackupperFactory can construct pod volumes backuppers. type BackupperFactory interface { // NewBackupper returns a pod volumes backupper for use during a single Velero backup. - NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error) + NewBackupper(context.Context, logrus.FieldLogger, *velerov1api.Backup, string) (Backupper, error) } func NewBackupperFactory( @@ -59,8 +59,8 @@ type backupperFactory struct { log logrus.FieldLogger } -func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) { - b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup) +func (bf *backupperFactory) NewBackupper(ctx context.Context, log logrus.FieldLogger, backup *velerov1api.Backup, uploaderType string) (Backupper, error) { + b := newBackupper(ctx, log, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup) if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) { return nil, errors.New("timed out waiting for caches to sync") diff --git a/pkg/podvolume/backupper_test.go b/pkg/podvolume/backupper_test.go index fe50f9e30e..a824daa70b 100644 --- a/pkg/podvolume/backupper_test.go +++ b/pkg/podvolume/backupper_test.go @@ -307,6 +307,7 @@ func TestBackupPodVolumes(t *testing.T) { scheme := runtime.NewScheme() velerov1api.AddToScheme(scheme) corev1api.AddToScheme(scheme) + log := logrus.New() tests := []struct { name string @@ -556,7 +557,7 @@ func TestBackupPodVolumes(t *testing.T) { backupObj.Spec.StorageLocation = test.bsl factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, fakeCtrlClient, pvbInformer, velerotest.NewLogger()) - bp, err := factory.NewBackupper(ctx, backupObj, test.uploaderType) + bp, err := factory.NewBackupper(ctx, log, backupObj, test.uploaderType) require.NoError(t, err) @@ -581,6 +582,91 @@ func TestBackupPodVolumes(t *testing.T) { } } +func TestGetPodVolumeBackup(t *testing.T) { + backupper := &backupper{ + pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + indexNamePod: podIndexFunc, + }), + } + + obj := &velerov1api.PodVolumeBackup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pvb", + }, + Spec: velerov1api.PodVolumeBackupSpec{ + Pod: corev1api.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "pod", + }, + }, + } + + err := backupper.pvbIndexer.Add(obj) + require.NoError(t, err) + + // not exist PVB + pvb, err := backupper.GetPodVolumeBackup("invalid-namespace", "invalid-name") + require.NoError(t, err) + assert.Nil(t, pvb) + + // exist PVB + pvb, err = backupper.GetPodVolumeBackup("velero", "pvb") + require.NoError(t, err) + assert.NotNil(t, pvb) +} + +func TestListPodVolumeBackupsByPodp(t *testing.T) { + backupper := &backupper{ + pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ + indexNamePod: podIndexFunc, + }), + } + + obj1 := &velerov1api.PodVolumeBackup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pvb1", + }, + Spec: velerov1api.PodVolumeBackupSpec{ + Pod: corev1api.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "pod", + }, + }, + } + obj2 := &velerov1api.PodVolumeBackup{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "velero", + Name: "pvb2", + }, + Spec: velerov1api.PodVolumeBackupSpec{ + Pod: corev1api.ObjectReference{ + Kind: "Pod", + Namespace: "default", + Name: "pod", + }, + }, + } + + err := backupper.pvbIndexer.Add(obj1) + require.NoError(t, err) + err = backupper.pvbIndexer.Add(obj2) + require.NoError(t, err) + + // not exist PVBs + pvbs, err := backupper.ListPodVolumeBackupsByPod("invalid-namespace", "invalid-name") + require.NoError(t, err) + assert.Empty(t, pvbs) + + // exist PVBs + pvbs, err = backupper.ListPodVolumeBackupsByPod("default", "pod") + require.NoError(t, err) + assert.Len(t, pvbs, 2) +} + type logHook struct { entry *logrus.Entry } @@ -598,6 +684,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) { defer func() { cancelFunc() }() + log := logrus.New() cases := []struct { name string ctx context.Context @@ -653,7 +740,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) { logHook := &logHook{} logger.Hooks.Add(logHook) - backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{}) + backuper := newBackupper(c.ctx, log, nil, nil, informer, nil, "", &velerov1api.Backup{}) backuper.wg.Add(1) if c.statusToBeUpdated != nil {