Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BackupController: do as much as possible #250

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 47 additions & 16 deletions pkg/cloudprovider/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,32 +116,63 @@ func NewBackupService(objectStore ObjectStore, logger logrus.FieldLogger) Backup
}
}

func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error {
// upload metadata file
metadataKey := getMetadataKey(backupName)
if err := br.objectStore.PutObject(bucket, metadataKey, metadata); err != nil {
// failure to upload metadata file is a hard-stop
return err
func seekToBeginning(r io.Reader) error {
seeker, ok := r.(io.Seeker)
if !ok {
return nil
}

// upload tar file
if err := br.objectStore.PutObject(bucket, getBackupContentsKey(backupName), backup); err != nil {
// try to delete the metadata file since the data upload failed
deleteErr := br.objectStore.DeleteObject(bucket, metadataKey)
_, err := seeker.Seek(0, 0)
return err
}

func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) error {
if file == nil {
return nil
}

return kerrors.NewAggregate([]error{err, deleteErr})
if err := seekToBeginning(file); err != nil {
return errors.WithStack(err)
}

// uploading log file is best-effort; if it fails, we log the error but call the overall upload a
// success
return br.objectStore.PutObject(bucket, key, file)
}

func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error {
// Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the
// backup's status.
logKey := getBackupLogKey(backupName)
if err := br.objectStore.PutObject(bucket, logKey, log); err != nil {
if err := br.seekAndPutObject(bucket, logKey, log); err != nil {
br.logger.WithError(err).WithFields(logrus.Fields{
"bucket": bucket,
"key": logKey,
}).Error("Error uploading log file")
}

if metadata == nil {
// If we don't have metadata, something failed, and there's no point in continuing. An object
// storage bucket that is missing the metadata file can't be restored, nor can its logs be
// viewed.
return nil
}

// upload metadata file
metadataKey := getMetadataKey(backupName)
if err := br.seekAndPutObject(bucket, metadataKey, metadata); err != nil {
// failure to upload metadata file is a hard-stop
return err
}

if backup != nil {
// upload tar file
if err := br.seekAndPutObject(bucket, getBackupContentsKey(backupName), backup); err != nil {
// try to delete the metadata file since the data upload failed
deleteErr := br.objectStore.DeleteObject(bucket, metadataKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably makes sense to leave the metadata file here, right? For the same reason that we're uploading it for failed backups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was planning to ask you about that. I'm happy to remove this code!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle the situation where the backup completed successfully, we were able to upload the metadata, but uploading the tarball failed for some reason? What you'd see is a completed backup, with logs, but no ability to restore it... TODO for later, or fix now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't you see Failed on the API object? this func would return an error to the controller, and then runBackup would return an error to processBackup which would log it and mark it as failed. If this is true, I think it's still not ideal but reasonably obvious enough that no further changes would be needed for now,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my scenario, the json file in object storage has Completed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, but the backup obj in etcd has Failed, right? so ark backup get would show Failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to go back through the various places where status is changed to failed to confirm. Also, if you were to sync from object storage into a new kube cluster, it would come in as completed...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. idk, what do you think makes sense? we could remove everything from obj storage on error, or re-upload metadata with a Failed status, or...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on where to leave this for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot about this thread. Let me page it back in and think about it.


return kerrors.NewAggregate([]error{err, deleteErr})
}
}

return nil
}

Expand Down Expand Up @@ -173,8 +204,8 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
return output, nil
}

func (br *backupService) GetBackup(bucket, name string) (*api.Backup, error) {
key := fmt.Sprintf(metadataFileFormatString, name)
func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, error) {
key := getMetadataKey(backupName)

res, err := br.objectStore.GetObject(bucket, key)
if err != nil {
Expand Down
36 changes: 23 additions & 13 deletions pkg/cloudprovider/backup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,36 +46,46 @@ func TestUploadBackup(t *testing.T) {
expectMetadataDelete bool
backup io.ReadSeeker
backupError error
expectBackupUpload bool
log io.ReadSeeker
logError error
expectedErr string
}{
{
name: "normal case",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
log: newStringReadSeeker("baz"),
name: "normal case",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
log: newStringReadSeeker("baz"),
},
{
name: "error on metadata upload does not upload data or log",
name: "error on metadata upload does not upload data",
metadata: newStringReadSeeker("foo"),
metadataError: errors.New("md"),
log: newStringReadSeeker("baz"),
expectedErr: "md",
},
{
name: "error on data upload deletes metadata",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see prev comment re: whether to delete metadata file in this case

metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
backupError: errors.New("backup"),
expectMetadataDelete: true,
expectedErr: "backup",
},
{
name: "error on log upload is ok",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
log: newStringReadSeeker("baz"),
logError: errors.New("log"),
name: "error on log upload is ok",
metadata: newStringReadSeeker("foo"),
backup: newStringReadSeeker("bar"),
expectBackupUpload: true,
log: newStringReadSeeker("baz"),
logError: errors.New("log"),
},
{
name: "don't upload data when metadata is nil",
backup: newStringReadSeeker("bar"),
log: newStringReadSeeker("baz"),
},
}

Expand All @@ -87,11 +97,12 @@ func TestUploadBackup(t *testing.T) {
backupName = "test-backup"
logger = arktest.NewLogger()
)
defer objStore.AssertExpectations(t)

if test.metadata != nil {
objStore.On("PutObject", bucket, backupName+"/ark-backup.json", test.metadata).Return(test.metadataError)
}
if test.backup != nil {
if test.backup != nil && test.expectBackupUpload {
objStore.On("PutObject", bucket, backupName+"/"+backupName+".tar.gz", test.backup).Return(test.backupError)
}
if test.log != nil {
Expand All @@ -111,7 +122,6 @@ func TestUploadBackup(t *testing.T) {
assert.NoError(t, err)
}

objStore.AssertExpectations(t)
})
}
}
Expand Down Expand Up @@ -372,5 +382,5 @@ func newStringReadSeeker(s string) *stringReadSeeker {
}

func (srs *stringReadSeeker) Seek(offset int64, whence int) (int64, error) {
panic("not implemented")
return 0, nil
}
101 changes: 48 additions & 53 deletions pkg/controller/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sync"
Expand All @@ -32,7 +33,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
kuberrs "k8s.io/apimachinery/pkg/util/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -211,13 +212,14 @@ func (controller *backupController) processBackup(key string) error {
return errors.Wrap(err, "error getting backup")
}

// TODO I think this is now unnecessary. We only initially place
// item with Phase = ("" | New) into the queue. Items will only get
// re-queued if syncHandler returns an error, which will only
// happen if there's an error updating Phase from its initial
// state to something else. So any time it's re-queued it will
// still have its initial state, which we've already confirmed
// is ("" | New)
// Double-check we have the correct phase. In the unlikely event that multiple controller
// instances are running, it's possible for controller A to succeed in changing the phase to
// InProgress, while controller B's attempt to patch the phase fails. When controller B
// reprocesses the same backup, it will either show up as New (informer hasn't seen the update
// yet) or as InProgress. In the former case, the patch attempt will fail again, until the
// informer sees the update. In the latter case, after the informer has seen the update to
// InProgress, we still need this check so we can return nil to indicate we've finished processing
// this key (even though it was a no-op).
switch backup.Status.Phase {
case "", api.BackupPhaseNew:
// only process new backups
Expand Down Expand Up @@ -317,70 +319,63 @@ func (controller *backupController) getValidationErrors(itm *api.Backup) []strin
}

func (controller *backupController) runBackup(backup *api.Backup, bucket string) error {
backupFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for Backup")
}
log := controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup))
log.Info("Starting backup")

logFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for Backup log")
return errors.Wrap(err, "error creating temp file for backup log")
}
defer closeAndRemoveFile(logFile, log)

defer func() {
var errs []error
// TODO should this be wrapped?
errs = append(errs, err)

if err := backupFile.Close(); err != nil {
errs = append(errs, errors.Wrap(err, "error closing Backup temp file"))
}

if err := os.Remove(backupFile.Name()); err != nil {
errs = append(errs, errors.Wrap(err, "error removing Backup temp file"))
}

if err := logFile.Close(); err != nil {
errs = append(errs, errors.Wrap(err, "error closing Backup log temp file"))
}

if err := os.Remove(logFile.Name()); err != nil {
errs = append(errs, errors.Wrap(err, "error removing Backup log temp file"))
}

err = kuberrs.NewAggregate(errs)
}()
backupFile, err := ioutil.TempFile("", "")
if err != nil {
return errors.Wrap(err, "error creating temp file for backup")
}
defer closeAndRemoveFile(backupFile, log)

actions, err := controller.pluginManager.GetBackupItemActions(backup.Name)
if err != nil {
return err
}
defer controller.pluginManager.CloseBackupItemActions(backup.Name)

controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("starting backup")
var errs []error

var backupJsonToUpload, backupFileToUpload io.Reader

// Do the actual backup
if err := controller.backupper.Backup(backup, backupFile, logFile, actions); err != nil {
return err
}
errs = append(errs, err)

controller.logger.WithField("backup", kubeutil.NamespaceAndName(backup)).Info("backup completed")
backup.Status.Phase = api.BackupPhaseFailed
} else {
backup.Status.Phase = api.BackupPhaseCompleted
}

// note: updating this here so the uploaded JSON shows "completed". If
// the upload fails, we'll alter the phase in the calling func.
backup.Status.Phase = api.BackupPhaseCompleted
backupJson := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", backupJson); err != nil {
errs = append(errs, errors.Wrap(err, "error encoding backup"))
} else {
// Only upload the json and backup tarball if encoding to json succeeded.
backupJsonToUpload = backupJson
backupFileToUpload = backupFile
}

buf := new(bytes.Buffer)
if err := encode.EncodeTo(backup, "json", buf); err != nil {
return errors.Wrap(err, "error encoding Backup")
if err := controller.backupService.UploadBackup(bucket, backup.Name, backupJsonToUpload, backupFileToUpload, logFile); err != nil {
errs = append(errs, err)
}

// re-set the files' offset to 0 for reading
if _, err = backupFile.Seek(0, 0); err != nil {
return errors.Wrap(err, "error resetting Backup file offset")
log.Info("Backup completed")

return kerrors.NewAggregate(errs)
}

func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) {
if err := file.Close(); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error closing file")
}
if _, err = logFile.Seek(0, 0); err != nil {
return errors.Wrap(err, "error resetting Backup log file offset")
if err := os.Remove(file.Name()); err != nil {
log.WithError(err).WithField("file", file.Name()).Error("error removing file")
}

return controller.backupService.UploadBackup(bucket, backup.Name, buf, backupFile, logFile)
}