Skip to content

Commit

Permalink
refine what gets enqueued in PVB/PVR controllers, and log better
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <steve@heptio.com>
  • Loading branch information
skriss committed Jun 22, 2018
1 parent 636b09a commit 771d82c
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 73 deletions.
54 changes: 43 additions & 11 deletions pkg/controller/pod_volume_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,36 @@ func NewPodVolumeBackupController(

podVolumeBackupInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueue,
UpdateFunc: func(_, obj interface{}) { c.enqueue(obj) },
AddFunc: c.pvbHandler,
UpdateFunc: func(_, obj interface{}) { c.pvbHandler(obj) },
},
)

return c
}

func (c *podVolumeBackupController) pvbHandler(obj interface{}) {
req := obj.(*arkv1api.PodVolumeBackup)

// only enqueue items for this node
if req.Spec.Node != c.nodeName {
return
}

log := loggerForPodVolumeBackup(c.logger, req)

if req.Status.Phase != "" && req.Status.Phase != arkv1api.PodVolumeBackupPhaseNew {
log.Debug("Backup is not new, not enqueuing")
return
}

log.Debug("Enqueueing")
c.enqueue(obj)
}

func (c *podVolumeBackupController) processQueueItem(key string) error {
log := c.logger.WithField("key", key)
log.Debug("Running processItem")
log.Debug("Running processQueueItem")

ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand All @@ -120,22 +139,29 @@ func (c *podVolumeBackupController) processQueueItem(key string) error {
return nil
}

// only process items for this node
if req.Spec.Node != c.nodeName {
return nil
}

// Don't mutate the shared cache
reqCopy := req.DeepCopy()
return c.processBackupFunc(reqCopy)
}

func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) error {
log := c.logger.WithFields(logrus.Fields{
func loggerForPodVolumeBackup(baseLogger logrus.FieldLogger, req *arkv1api.PodVolumeBackup) logrus.FieldLogger {
log := baseLogger.WithFields(logrus.Fields{
"namespace": req.Namespace,
"name": req.Name,
})

if len(req.OwnerReferences) == 1 {
log = log.WithField("arkBackup", req.OwnerReferences[0].Name)
}

return log
}

func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup) error {
log := loggerForPodVolumeBackup(c.logger, req)

log.Info("Backup starting")

var err error

// update status to InProgress
Expand All @@ -157,11 +183,15 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup)
return c.fail(req, errors.Wrap(err, "error getting volume directory name").Error(), log)
}

path, err := singlePathMatch(fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir))
pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir)
log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob")

path, err := singlePathMatch(pathGlob)
if err != nil {
log.WithError(err).Error("Error uniquely identifying volume path")
return c.fail(req, errors.Wrap(err, "error getting volume path on host").Error(), log)
}
log.WithField("path", path).Debugf("Found path matching glob")

// temp creds
file, err := restic.TempCredentialsFile(c.secretLister, req.Spec.Pod.Namespace)
Expand Down Expand Up @@ -204,6 +234,8 @@ func (c *podVolumeBackupController) processBackup(req *arkv1api.PodVolumeBackup)
return err
}

log.Info("Backup completed")

return nil
}

Expand Down
124 changes: 62 additions & 62 deletions pkg/controller/pod_volume_restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,98 +111,84 @@ func NewPodVolumeRestoreController(

func (c *podVolumeRestoreController) pvrHandler(obj interface{}) {
pvr := obj.(*arkv1api.PodVolumeRestore)
log := c.logger.WithField("key", kube.NamespaceAndName(pvr))
log := loggerForPodVolumeRestore(c.logger, pvr)

if !shouldEnqueuePVR(pvr, c.podLister, c.nodeName, log) {
if !isPVRNew(pvr) {
log.Debugf("Restore is not new, not enqueuing")
return
}

log.Debug("enqueueing")
c.enqueue(obj)
}

func (c *podVolumeRestoreController) podHandler(obj interface{}) {
pod := obj.(*corev1api.Pod)
log := c.logger.WithField("key", kube.NamespaceAndName(pod))

for _, pvr := range pvrsToEnqueueForPod(pod, c.podVolumeRestoreLister, c.nodeName, log) {
c.enqueue(pvr)
pod, err := c.podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name)
if err != nil {
log.WithError(err).Debugf("Unable to get restore's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
return
}
}

func shouldProcessPod(pod *corev1api.Pod, nodeName string, log logrus.FieldLogger) bool {
// if the pod lister being used is filtered to pods on this node, this is superfluous,
// but retaining for safety.
if pod.Spec.NodeName != nodeName {
log.Debugf("Pod is scheduled on node %s, not enqueueing.", pod.Spec.NodeName)
return false
if !isPodOnNode(pod, c.nodeName) {
log.Debugf("Restore's pod is not on this node, not enqueuing")
return
}

// only process items for pods that have the restic initContainer running
if !resticInitContainerRunning(pod) {
log.Debugf("Pod is not running restic initContainer, not enqueueing.")
return false
if !isResticInitContainerRunning(pod) {
log.Debug("Restore's pod is not running restic-wait init container, not enqueuing")
return
}

return true
log.Debug("Enqueueing")
c.enqueue(obj)
}

func shouldProcessPVR(pvr *arkv1api.PodVolumeRestore, log logrus.FieldLogger) bool {
// only process new items
if pvr.Status.Phase != "" && pvr.Status.Phase != arkv1api.PodVolumeRestorePhaseNew {
log.Debugf("Item has phase %s, not enqueueing.", pvr.Status.Phase)
return false
}

return true
}
func (c *podVolumeRestoreController) podHandler(obj interface{}) {
pod := obj.(*corev1api.Pod)
log := c.logger.WithField("key", kube.NamespaceAndName(pod))

func pvrsToEnqueueForPod(pod *corev1api.Pod, pvrLister listers.PodVolumeRestoreLister, nodeName string, log logrus.FieldLogger) []*arkv1api.PodVolumeRestore {
if !shouldProcessPod(pod, nodeName, log) {
return nil
// the pod should always be for this node since the podInformer is filtered
// based on node, so this is just a failsafe.
if pod.Spec.NodeName != c.nodeName {
return
}

selector, err := labels.Parse(fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID))
if err != nil {
log.WithError(err).Error("Unable to parse label selector %s", fmt.Sprintf("%s=%s", arkv1api.PodUIDLabel, pod.UID))
return nil
return
}

pvrs, err := pvrLister.List(selector)
pvrs, err := c.podVolumeRestoreLister.List(selector)
if err != nil {
log.WithError(err).Error("Unable to list pod volume restores")
return nil
return
}

var res []*arkv1api.PodVolumeRestore
for i, pvr := range pvrs {
if shouldProcessPVR(pvr, log) {
res = append(res, pvrs[i])
}
if len(pvrs) == 0 {
return
}

return res
}

func shouldEnqueuePVR(pvr *arkv1api.PodVolumeRestore, podLister corev1listers.PodLister, nodeName string, log logrus.FieldLogger) bool {
if !shouldProcessPVR(pvr, log) {
return false
if !isResticInitContainerRunning(pod) {
log.Debug("Pod is not running restic-wait init container, not enqueuing restores for pod")
return
}

pod, err := podLister.Pods(pvr.Spec.Pod.Namespace).Get(pvr.Spec.Pod.Name)
if err != nil {
log.WithError(err).Errorf("Unable to get item's pod %s/%s, not enqueueing.", pvr.Spec.Pod.Namespace, pvr.Spec.Pod.Name)
return false
for _, pvr := range pvrs {
log := loggerForPodVolumeRestore(log, pvr)
if !isPVRNew(pvr) {
log.Debug("Restore is not new, not enqueuing")
continue
}
log.Debug("Enqueuing")
c.enqueue(obj)
}
}

if !shouldProcessPod(pod, nodeName, log) {
return false
}
func isPVRNew(pvr *arkv1api.PodVolumeRestore) bool {
return pvr.Status.Phase == "" || pvr.Status.Phase == arkv1api.PodVolumeRestorePhaseNew
}

return true
func isPodOnNode(pod *corev1api.Pod, node string) bool {
return pod.Spec.NodeName == node
}

func resticInitContainerRunning(pod *corev1api.Pod) bool {
func isResticInitContainerRunning(pod *corev1api.Pod) bool {
// no init containers, or the first one is not the ark restic one: return false
if len(pod.Spec.InitContainers) == 0 || pod.Spec.InitContainers[0].Name != restic.InitContainer {
return false
Expand All @@ -219,7 +205,7 @@ func resticInitContainerRunning(pod *corev1api.Pod) bool {

func (c *podVolumeRestoreController) processQueueItem(key string) error {
log := c.logger.WithField("key", key)
log.Debug("Running processItem")
log.Debug("Running processQueueItem")

ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand All @@ -241,12 +227,24 @@ func (c *podVolumeRestoreController) processQueueItem(key string) error {
return c.processRestoreFunc(reqCopy)
}

func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeRestore) error {
log := c.logger.WithFields(logrus.Fields{
func loggerForPodVolumeRestore(baseLogger logrus.FieldLogger, req *arkv1api.PodVolumeRestore) logrus.FieldLogger {
log := baseLogger.WithFields(logrus.Fields{
"namespace": req.Namespace,
"name": req.Name,
})

if len(req.OwnerReferences) == 1 {
log = log.WithField("arkRestore", req.OwnerReferences[0].Name)
}

return log
}

func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeRestore) error {
log := loggerForPodVolumeRestore(c.logger, req)

log.Info("Restore starting")

var err error

// update status to InProgress
Expand Down Expand Up @@ -288,6 +286,8 @@ func (c *podVolumeRestoreController) processRestore(req *arkv1api.PodVolumeResto
return err
}

log.Info("Restore completed")

return nil
}

Expand Down

0 comments on commit 771d82c

Please sign in to comment.