Skip to content

Commit

Permalink
Check the PVB status via podvolume Backupper rather than API server t…
Browse files Browse the repository at this point in the history
…o avoid timeout issue

Check the PVB status via podvolume Backupper rather than API server to avoid ti
meout issue

Fixes vmware-tanzu#8587

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
  • Loading branch information
ywk253100 committed Jan 9, 2025
1 parent 7db8761 commit 6384f27
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 38 deletions.
46 changes: 15 additions & 31 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -301,7 +299,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(

var podVolumeBackupper podvolume.Backupper
if kb.podVolumeBackupperFactory != nil {
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, backupRequest.Backup, kb.uploaderType)
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType)
if err != nil {
log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper")
return errors.WithStack(err)
Expand Down Expand Up @@ -743,35 +741,21 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
}

func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}

podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}

pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for i, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue
for _, pod := range pods {
namespace, name := pod.Item.GetNamespace(), pod.Item.GetName()
pvbs, err := itemBlock.itemBackupper.podVolumeBackupper.ListPodVolumeBackupsByPod(namespace, name)
if err != nil {
return errors.Wrapf(err, "failed to list PodVolumeBackups for pod %s/%s", namespace, name)
}

processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
for _, pvb := range pvbs {
processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
}
pvbMap[pvb] = processed
}
pvbMap[&pvbList.Items[i]] = processed
}

checkFunc := func(context.Context) (done bool, err error) {
Expand All @@ -780,8 +764,8 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l
if processed {
continue
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackup(pvb.Namespace, pvb.Name)
if err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue
Expand Down
52 changes: 48 additions & 4 deletions pkg/podvolume/backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup
GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error)
ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error)
}

type backupper struct {
Expand All @@ -59,7 +61,7 @@ type backupper struct {
pvbInformer ctrlcache.Informer
handlerRegistration cache.ResourceEventHandlerRegistration
wg sync.WaitGroup
result []*velerov1api.PodVolumeBackup
pvbIndexer cache.Indexer
}

type skippedPVC struct {
Expand Down Expand Up @@ -101,8 +103,22 @@ func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) {
}
}

const indexNamePod = "POD"

func podIndexFunc(obj interface{}) ([]string, error) {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
if pvb == nil {
return nil, errors.New("PodVolumeBackup is nil")
}
return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil
}

func newBackupper(
ctx context.Context,
log logrus.FieldLogger,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
pvbInformer ctrlcache.Informer,
Expand All @@ -118,7 +134,9 @@ func newBackupper(
uploaderType: uploaderType,
pvbInformer: pvbInformer,
wg: sync.WaitGroup{},
result: []*velerov1api.PodVolumeBackup{},
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}

b.handlerRegistration, _ = pvbInformer.AddEventHandler(
Expand All @@ -135,7 +153,9 @@ func newBackupper(
return
}

b.result = append(b.result, pvb)
if err := b.pvbIndexer.Update(pvb); err != nil {
log.WithError(err).Error("failed to update PVB in indexer")
}
b.wg.Done()
},
},
Expand Down Expand Up @@ -312,6 +332,12 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
continue
}
b.wg.Add(1)

if err := b.pvbIndexer.Add(volumeBackup); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to create PodVolumeBackup for pvc %s", pvc.Spec.VolumeName))
continue
}

podVolumeBackups = append(podVolumeBackups, volumeBackup)
pvcSummary.addBackedup(volumeName)
}
Expand All @@ -337,7 +363,8 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
case <-b.ctx.Done():
log.Error("timed out waiting for all PodVolumeBackups to complete")
case <-done:
for _, pvb := range b.result {
for _, obj := range b.pvbIndexer.List() {
pvb := obj.(*velerov1api.PodVolumeBackup)
podVolumeBackups = append(podVolumeBackups, pvb)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
log.Errorf("pod volume backup failed: %s", pvb.Status.Message)
Expand All @@ -347,6 +374,23 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
return podVolumeBackups
}

func (b *backupper) GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) {
obj, _, err := b.pvbIndexer.GetByKey(cache.NewObjectName(namespace, name).String())
return obj.(*velerov1api.PodVolumeBackup), err
}

func (b *backupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) {
objs, err := b.pvbIndexer.ByIndex(indexNamePod, cache.NewObjectName(podNamespace, podName).String())
if err != nil {
return nil, err
}
var pvbs []*velerov1api.PodVolumeBackup
for _, obj := range objs {
pvbs = append(pvbs, obj.(*velerov1api.PodVolumeBackup))
}
return pvbs, nil
}

func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) {
for _, volumeName := range volumesToBackup {
log.WithError(err).Warnf("Skip pod volume %s", volumeName)
Expand Down
6 changes: 3 additions & 3 deletions pkg/podvolume/backupper_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// BackupperFactory can construct pod volumes backuppers.
type BackupperFactory interface {
// NewBackupper returns a pod volumes backupper for use during a single Velero backup.
NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error)
NewBackupper(context.Context, logrus.FieldLogger, *velerov1api.Backup, string) (Backupper, error)
}

func NewBackupperFactory(
Expand All @@ -59,8 +59,8 @@ type backupperFactory struct {
log logrus.FieldLogger
}

func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)
func (bf *backupperFactory) NewBackupper(ctx context.Context, log logrus.FieldLogger, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, log, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)

if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) {
return nil, errors.New("timed out waiting for caches to sync")
Expand Down

0 comments on commit 6384f27

Please sign in to comment.