Skip to content

Commit

Permalink
Check the PVB status via podvolume Backupper rather than calling API …
Browse files Browse the repository at this point in the history
…server to avoid API server issue

Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue

Fixes vmware-tanzu#8587

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
  • Loading branch information
ywk253100 committed Jan 10, 2025
1 parent 7db8761 commit b12df50
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 41 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8596-ywk253100
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Check the PVB status via podvolume Backupper rather than calling API server to avoid API server issue
48 changes: 17 additions & 31 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -301,7 +299,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(

var podVolumeBackupper podvolume.Backupper
if kb.podVolumeBackupperFactory != nil {
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, backupRequest.Backup, kb.uploaderType)
podVolumeBackupper, err = kb.podVolumeBackupperFactory.NewBackupper(ctx, log, backupRequest.Backup, kb.uploaderType)
if err != nil {
log.WithError(errors.WithStack(err)).Debugf("Error from NewBackupper")
return errors.WithStack(err)
Expand Down Expand Up @@ -729,6 +727,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

// the post hooks will not execute until all PVBs of the item block pods are processed
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
Expand All @@ -742,36 +741,23 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
}
}

// wait all PVBs of the item block pods to be processed
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}

podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}

pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for i, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue
for _, pod := range pods {
namespace, name := pod.Item.GetNamespace(), pod.Item.GetName()
pvbs, err := itemBlock.itemBackupper.podVolumeBackupper.ListPodVolumeBackupsByPod(namespace, name)
if err != nil {
return errors.Wrapf(err, "failed to list PodVolumeBackups for pod %s/%s", namespace, name)
}

processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
for _, pvb := range pvbs {
processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
}
pvbMap[pvb] = processed
}
pvbMap[&pvbList.Items[i]] = processed
}

checkFunc := func(context.Context) (done bool, err error) {
Expand All @@ -780,8 +766,8 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l
if processed {
continue
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
updatedPVB, err := itemBlock.itemBackupper.podVolumeBackupper.GetPodVolumeBackup(pvb.Namespace, pvb.Name)
if err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue
Expand Down
20 changes: 19 additions & 1 deletion pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3913,7 +3913,7 @@ func TestBackupWithHooks(t *testing.T) {

type fakePodVolumeBackupperFactory struct{}

func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, *velerov1.Backup, string) (podvolume.Backupper, error) {
func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, logrus.FieldLogger, *velerov1.Backup, string) (podvolume.Backupper, error) {
return &fakePodVolumeBackupper{}, nil
}

Expand Down Expand Up @@ -3946,6 +3946,24 @@ func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogg
return b.pvbs
}

func (b *fakePodVolumeBackupper) GetPodVolumeBackup(namespace, name string) (*velerov1.PodVolumeBackup, error) {
for _, pvb := range b.pvbs {
if pvb.Namespace == namespace && pvb.Name == name {
return pvb, nil
}
}
return nil, nil
}
func (b *fakePodVolumeBackupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1.PodVolumeBackup, error) {
var pvbs []*velerov1.PodVolumeBackup
for _, pvb := range b.pvbs {
if pvb.Spec.Pod.Namespace == podNamespace && pvb.Spec.Pod.Name == podName {
pvbs = append(pvbs, pvb)
}
}
return pvbs, nil
}

// TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup,
// and ensures that the pod volume backupper is called, that the returned PodVolumeBackups
// are added to the Request object, and that when PVCs are backed up with PodVolume, the
Expand Down
62 changes: 58 additions & 4 deletions pkg/podvolume/backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Backupper interface {
// BackupPodVolumes backs up all specified volumes in a pod.
BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error)
WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup
GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error)
ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error)
}

type backupper struct {
Expand All @@ -59,7 +61,10 @@ type backupper struct {
pvbInformer ctrlcache.Informer
handlerRegistration cache.ResourceEventHandlerRegistration
wg sync.WaitGroup
result []*velerov1api.PodVolumeBackup
// pvbIndexer holds all PVBs created by this backuper and is capable to search
// the PVBs based on specific properties quickly because of the embedded indexes.
// The statuses of the PVBs are got updated when Informer receives update events.
pvbIndexer cache.Indexer
}

type skippedPVC struct {
Expand Down Expand Up @@ -101,8 +106,22 @@ func (pbs *PVCBackupSummary) addSkipped(volumeName string, reason string) {
}
}

const indexNamePod = "POD"

func podIndexFunc(obj interface{}) ([]string, error) {
pvb, ok := obj.(*velerov1api.PodVolumeBackup)
if !ok {
return nil, errors.Errorf("expected PodVolumeBackup, but got %T", obj)
}
if pvb == nil {
return nil, errors.New("PodVolumeBackup is nil")
}
return []string{cache.NewObjectName(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name).String()}, nil
}

func newBackupper(
ctx context.Context,
log logrus.FieldLogger,
repoLocker *repository.RepoLocker,
repoEnsurer *repository.Ensurer,
pvbInformer ctrlcache.Informer,
Expand All @@ -118,7 +137,9 @@ func newBackupper(
uploaderType: uploaderType,
pvbInformer: pvbInformer,
wg: sync.WaitGroup{},
result: []*velerov1api.PodVolumeBackup{},
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}

b.handlerRegistration, _ = pvbInformer.AddEventHandler(
Expand All @@ -135,7 +156,10 @@ func newBackupper(
return
}

b.result = append(b.result, pvb)
// the Indexer inserts PVB directly if the PVB to be updated doesn't exist
if err := b.pvbIndexer.Update(pvb); err != nil {
log.WithError(err).Error("failed to update PVB in indexer")
}
b.wg.Done()
},
},
Expand Down Expand Up @@ -312,6 +336,12 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.
continue
}
b.wg.Add(1)

if err := b.pvbIndexer.Add(volumeBackup); err != nil {
errs = append(errs, errors.Wrapf(err, "failed to create PodVolumeBackup for pvc %s", pvc.Spec.VolumeName))
continue
}

podVolumeBackups = append(podVolumeBackups, volumeBackup)
pvcSummary.addBackedup(volumeName)
}
Expand All @@ -337,7 +367,8 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
case <-b.ctx.Done():
log.Error("timed out waiting for all PodVolumeBackups to complete")
case <-done:
for _, pvb := range b.result {
for _, obj := range b.pvbIndexer.List() {
pvb := obj.(*velerov1api.PodVolumeBackup)
podVolumeBackups = append(podVolumeBackups, pvb)
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
log.Errorf("pod volume backup failed: %s", pvb.Status.Message)
Expand All @@ -347,6 +378,29 @@ func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velero
return podVolumeBackups
}

func (b *backupper) GetPodVolumeBackup(namespace, name string) (*velerov1api.PodVolumeBackup, error) {
obj, exist, err := b.pvbIndexer.GetByKey(cache.NewObjectName(namespace, name).String())
if err != nil {
return nil, err
}
if !exist {
return nil, nil
}
return obj.(*velerov1api.PodVolumeBackup), nil
}

func (b *backupper) ListPodVolumeBackupsByPod(podNamespace, podName string) ([]*velerov1api.PodVolumeBackup, error) {
objs, err := b.pvbIndexer.ByIndex(indexNamePod, cache.NewObjectName(podNamespace, podName).String())
if err != nil {
return nil, err
}
var pvbs []*velerov1api.PodVolumeBackup
for _, obj := range objs {
pvbs = append(pvbs, obj.(*velerov1api.PodVolumeBackup))
}
return pvbs, nil
}

func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) {
for _, volumeName := range volumesToBackup {
log.WithError(err).Warnf("Skip pod volume %s", volumeName)
Expand Down
6 changes: 3 additions & 3 deletions pkg/podvolume/backupper_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// BackupperFactory can construct pod volumes backuppers.
type BackupperFactory interface {
// NewBackupper returns a pod volumes backupper for use during a single Velero backup.
NewBackupper(context.Context, *velerov1api.Backup, string) (Backupper, error)
NewBackupper(context.Context, logrus.FieldLogger, *velerov1api.Backup, string) (Backupper, error)
}

func NewBackupperFactory(
Expand All @@ -59,8 +59,8 @@ type backupperFactory struct {
log logrus.FieldLogger
}

func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)
func (bf *backupperFactory) NewBackupper(ctx context.Context, log logrus.FieldLogger, backup *velerov1api.Backup, uploaderType string) (Backupper, error) {
b := newBackupper(ctx, log, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup)

if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) {
return nil, errors.New("timed out waiting for caches to sync")
Expand Down
91 changes: 89 additions & 2 deletions pkg/podvolume/backupper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func TestBackupPodVolumes(t *testing.T) {
scheme := runtime.NewScheme()
velerov1api.AddToScheme(scheme)
corev1api.AddToScheme(scheme)
log := logrus.New()

tests := []struct {
name string
Expand Down Expand Up @@ -556,7 +557,7 @@ func TestBackupPodVolumes(t *testing.T) {
backupObj.Spec.StorageLocation = test.bsl

factory := NewBackupperFactory(repository.NewRepoLocker(), ensurer, fakeCtrlClient, pvbInformer, velerotest.NewLogger())
bp, err := factory.NewBackupper(ctx, backupObj, test.uploaderType)
bp, err := factory.NewBackupper(ctx, log, backupObj, test.uploaderType)

require.NoError(t, err)

Expand All @@ -581,6 +582,91 @@ func TestBackupPodVolumes(t *testing.T) {
}
}

func TestGetPodVolumeBackup(t *testing.T) {
backupper := &backupper{
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}

obj := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}

err := backupper.pvbIndexer.Add(obj)
require.NoError(t, err)

// not exist PVB
pvb, err := backupper.GetPodVolumeBackup("invalid-namespace", "invalid-name")
require.NoError(t, err)
assert.Nil(t, pvb)

// exist PVB
pvb, err = backupper.GetPodVolumeBackup("velero", "pvb")
require.NoError(t, err)
assert.NotNil(t, pvb)
}

func TestListPodVolumeBackupsByPodp(t *testing.T) {
backupper := &backupper{
pvbIndexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{
indexNamePod: podIndexFunc,
}),
}

obj1 := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb1",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}
obj2 := &velerov1api.PodVolumeBackup{
ObjectMeta: metav1.ObjectMeta{
Namespace: "velero",
Name: "pvb2",
},
Spec: velerov1api.PodVolumeBackupSpec{
Pod: corev1api.ObjectReference{
Kind: "Pod",
Namespace: "default",
Name: "pod",
},
},
}

err := backupper.pvbIndexer.Add(obj1)
require.NoError(t, err)
err = backupper.pvbIndexer.Add(obj2)
require.NoError(t, err)

// not exist PVBs
pvbs, err := backupper.ListPodVolumeBackupsByPod("invalid-namespace", "invalid-name")
require.NoError(t, err)
assert.Empty(t, pvbs)

// exist PVBs
pvbs, err = backupper.ListPodVolumeBackupsByPod("default", "pod")
require.NoError(t, err)
assert.Len(t, pvbs, 2)
}

type logHook struct {
entry *logrus.Entry
}
Expand All @@ -598,6 +684,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) {
defer func() {
cancelFunc()
}()
log := logrus.New()
cases := []struct {
name string
ctx context.Context
Expand Down Expand Up @@ -653,7 +740,7 @@ func TestWaitAllPodVolumesProcessed(t *testing.T) {
logHook := &logHook{}
logger.Hooks.Add(logHook)

backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{})
backuper := newBackupper(c.ctx, log, nil, nil, informer, nil, "", &velerov1api.Backup{})
backuper.wg.Add(1)

if c.statusToBeUpdated != nil {
Expand Down

0 comments on commit b12df50

Please sign in to comment.