From 771d82c78f361d4d17da664bd221a976933c51fb Mon Sep 17 00:00:00 2001 From: Steve Kriss Date: Wed, 20 Jun 2018 15:46:41 -0700 Subject: [PATCH] refine what gets enqueued in PVB/PVR controllers, and log better Signed-off-by: Steve Kriss --- .../pod_volume_backup_controller.go | 54 ++++++-- .../pod_volume_restore_controller.go | 124 +++++++++--------- 2 files changed, 105 insertions(+), 73 deletions(-) diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 4a9a5200592..51c533ec857 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -86,17 +86,36 @@ func NewPodVolumeBackupController( podVolumeBackupInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueue, - UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) }, + AddFunc: c.pvbHandler, + UpdateFunc: func(_, obj interface{}) { c.pvbHandler(obj) }, }, ) return c } +func (c *podVolumeBackupController) pvbHandler(obj interface{}) { + req := obj.(*arkv1api.PodVolumeBackup) + + // only enqueue items for this node + if req.Spec.Node != c.nodeName { + return + } + + log := loggerForPodVolumeBackup(c.logger, req) + + if req.Status.Phase != "" && req.Status.Phase != arkv1api.PodVolumeBackupPhaseNew { + log.Debug("Backup is not new, not enqueuing") + return + } + + log.Debug("Enqueueing") + c.enqueue(obj) +} + func (c *podVolumeBackupController) processQueueItem(key string) error { log := c.logger.WithField("key", key) - log.Debug("Running processItem") + log.Debug("Running processQueueItem") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -120,22 +139,29 @@ func (c *podVolumeBackupController) processQueueItem(key string) error { return nil } - // only process items for this node - if req.Spec.Node != c.nodeName { - return nil - } - // Don't mutate the shared cache reqCopy := req.DeepCopy() return c.processBackupFunc(reqCopy) } -func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) error { - log := c.logger.WithFields(logrus.Fields{ +func loggerForPodVolumeBackup(baseLogger logrus.FieldLogger, req *arkv1api.PodVolumeBackup) logrus.FieldLogger { + log := baseLogger.WithFields(logrus.Fields{ "namespace": req.Namespace, "name": req.Name, }) + if len(req.OwnerReferences) == 1 { + log = log.WithField("arkBackup", req.OwnerReferences[0].Name) + } + + return log +} + +func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) error { + log := loggerForPodVolumeBackup(c.logger, req) + + log.Info("Backup starting") + var err error // update status to InProgress @@ -157,11 +183,15 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) return c.fail(req, errors.Wrap(err, "error getting volume directory name").Error(), log) } - path, err := singlePathMatch(fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir)) + pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir) + log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") + + path, err := singlePathMatch(pathGlob) if err != nil { log.WithError(err).Error("Error uniquely identifying volume path") return c.fail(req, errors.Wrap(err, "error getting volume path on host").Error(), log) } + log.WithField("path", path).Debugf("Found path matching glob") // temp creds file, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace) @@ -204,6 +234,8 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) return err } + log.Info("Backup completed") + return nil } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index eb3dd2e5549..0ee668cd00d 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -111,98 +111,84 @@ func NewPodVolumeRestoreController( func (c *podVolumeRestoreController) pvrHandler(obj interface{}) { pvr := obj.(*arkv1api.PodVolumeRestore) - log := c.logger.WithField("key", kube.NamespaceAndName(pvr)) + log := loggerForPodVolumeRestore(c.logger, pvr) - if !shouldEnqueuePVR(pvr, c.podLister, c.nodeName, log) { + if !isPVRNew(pvr) { + log.Debugf("Restore is not new, not enqueuing") return } - log.Debug("enqueueing") - c.enqueue(obj) -} - -func (c *podVolumeRestoreController) podHandler(obj interface{}) { - pod := obj.(*corev1api.Pod) - log := c.logger.WithField("key", kube.NamespaceAndName(pod)) - - for _, pvr := range pvrsToEnqueueForPod(pod, c.podVolumeRestoreLister, c.nodeName, log) { - c.enqueue(pvr) + pod, err := c.podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name) + if err != nil { + log.WithError(err).Debugf("Unable to get restore's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name) + return } -} -func shouldProcessPod(pod *corev1api.Pod, nodeName string, log logrus.FieldLogger) bool { - // if the pod lister being used is filtered to pods on this node, this is superfluous, - // but retaining for safety. - if pod.Spec.NodeName != nodeName { - log.Debugf("Pod is scheduled on node %s, not enqueueing.", pod.Spec.NodeName) - return false + if !isPodOnNode(pod, c.nodeName) { + log.Debugf("Restore's pod is not on this node, not enqueuing") + return } - // only process items for pods that have the restic initContainer running - if !resticInitContainerRunning(pod) { - log.Debugf("Pod is not running restic initContainer, not enqueueing.") - return false + if !isResticInitContainerRunning(pod) { + log.Debug("Restore's pod is not running restic-wait init container, not enqueuing") + return } - return true + log.Debug("Enqueueing") + c.enqueue(obj) } -func shouldProcessPVR(pvr *arkv1api.PodVolumeRestore, log logrus.FieldLogger) bool { - // only process new items - if pvr.Status.Phase != "" && pvr.Status.Phase != arkv1api.PodVolumeRestorePhaseNew { - log.Debugf("Item has phase %s, not enqueueing.", pvr.Status.Phase) - return false - } - - return true -} +func (c *podVolumeRestoreController) podHandler(obj interface{}) { + pod := obj.(*corev1api.Pod) + log := c.logger.WithField("key", kube.NamespaceAndName(pod)) -func pvrsToEnqueueForPod(pod *corev1api.Pod, pvrLister listers.PodVolumeRestoreLister, nodeName string, log logrus.FieldLogger) []*arkv1api.PodVolumeRestore { - if !shouldProcessPod(pod, nodeName, log) { - return nil + // the pod should always be for this node since the podInformer is filtered + // based on node, so this is just a failsafe. + if pod.Spec.NodeName != c.nodeName { + return } selector, err := labels.Parse(fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID)) if err != nil { log.WithError(err).Error("Unable to parse label selector %s", fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID)) - return nil + return } - pvrs, err := pvrLister.List(selector) + pvrs, err := c.podVolumeRestoreLister.List(selector) if err != nil { log.WithError(err).Error("Unable to list pod volume restores") - return nil + return } - var res []*arkv1api.PodVolumeRestore - for i, pvr := range pvrs { - if shouldProcessPVR(pvr, log) { - res = append(res, pvrs[i]) - } + if len(pvrs) == 0 { + return } - return res -} - -func shouldEnqueuePVR(pvr *arkv1api.PodVolumeRestore, podLister corev1listers.PodLister, nodeName string, log logrus.FieldLogger) bool { - if !shouldProcessPVR(pvr, log) { - return false + if !isResticInitContainerRunning(pod) { + log.Debug("Pod is not running restic-wait init container, not enqueuing restores for pod") + return } - pod, err := podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name) - if err != nil { - log.WithError(err).Errorf("Unable to get item's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name) - return false + for _, pvr := range pvrs { + log := loggerForPodVolumeRestore(log, pvr) + if !isPVRNew(pvr) { + log.Debug("Restore is not new, not enqueuing") + continue + } + log.Debug("Enqueuing") + c.enqueue(obj) } +} - if !shouldProcessPod(pod, nodeName, log) { - return false - } +func isPVRNew(pvr *arkv1api.PodVolumeRestore) bool { + return pvr.Status.Phase == "" || pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseNew +} - return true +func isPodOnNode(pod *corev1api.Pod, node string) bool { + return pod.Spec.NodeName == node } -func resticInitContainerRunning(pod *corev1api.Pod) bool { +func isResticInitContainerRunning(pod *corev1api.Pod) bool { // no init containers, or the first one is not the ark restic one: return false if len(pod.Spec.InitContainers) == 0 || pod.Spec.InitContainers[0].Name != restic.InitContainer { return false @@ -219,7 +205,7 @@ func resticInitContainerRunning(pod *corev1api.Pod) bool { func (c *podVolumeRestoreController) processQueueItem(key string) error { log := c.logger.WithField("key", key) - log.Debug("Running processItem") + log.Debug("Running processQueueItem") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -241,12 +227,24 @@ func (c *podVolumeRestoreController) processQueueItem(key string) error { return c.processRestoreFunc(reqCopy) } -func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeRestore) error { - log := c.logger.WithFields(logrus.Fields{ +func loggerForPodVolumeRestore(baseLogger logrus.FieldLogger, req *arkv1api.PodVolumeRestore) logrus.FieldLogger { + log := baseLogger.WithFields(logrus.Fields{ "namespace": req.Namespace, "name": req.Name, }) + if len(req.OwnerReferences) == 1 { + log = log.WithField("arkRestore", req.OwnerReferences[0].Name) + } + + return log +} + +func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeRestore) error { + log := loggerForPodVolumeRestore(c.logger, req) + + log.Info("Restore starting") + var err error // update status to InProgress @@ -288,6 +286,8 @@ func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeResto return err } + log.Info("Restore completed") + return nil }