Skip to content

Commit

Permalink
Adding context to backupstorage API.
Browse files Browse the repository at this point in the history
The only current user is the GCS one.
  • Loading branch information
alainjobart committed Nov 17, 2016
1 parent b599113 commit 9ccc238
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 97 deletions.
32 changes: 16 additions & 16 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func Backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, dir,
return err
}
defer bs.Close()
bh, err := bs.StartBackup(dir, name)
bh, err := bs.StartBackup(ctx, dir, name)
if err != nil {
return fmt.Errorf("StartBackup failed: %v", err)
}
Expand All @@ -213,10 +213,10 @@ func Backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, dir,
usable, err := backup(ctx, mysqld, logger, bh, backupConcurrency, hookExtraEnv)
var finishErr error
if usable {
finishErr = bh.EndBackup()
finishErr = bh.EndBackup(ctx)
} else {
logger.Errorf("backup is not usable, aborting it: %v", err)
finishErr = bh.AbortBackup()
finishErr = bh.AbortBackup(ctx)
}
if err != nil {
if finishErr != nil {
Expand Down Expand Up @@ -293,7 +293,7 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
}

// Backup everything, capture the error.
backupErr := backupFiles(mysqld, logger, bh, replicationPosition, backupConcurrency, hookExtraEnv)
backupErr := backupFiles(ctx, mysqld, logger, bh, replicationPosition, backupConcurrency, hookExtraEnv)
usable := backupErr == nil

// Try to restart mysqld
Expand Down Expand Up @@ -335,7 +335,7 @@ func backup(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh b
}

// backupFiles finds the list of files to backup, and creates the backup.
func backupFiles(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, replicationPosition replication.Position, backupConcurrency int, hookExtraEnv map[string]string) (err error) {
func backupFiles(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, replicationPosition replication.Position, backupConcurrency int, hookExtraEnv map[string]string) (err error) {
// Get the files to backup.
fes, err := findFilesTobackup(mysqld.Cnf())
if err != nil {
Expand All @@ -362,7 +362,7 @@ func backupFiles(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.Bac

// Backup the individual file.
name := fmt.Sprintf("%v", i)
rec.RecordError(backupFile(mysqld, logger, bh, &fes[i], name, hookExtraEnv))
rec.RecordError(backupFile(ctx, mysqld, logger, bh, &fes[i], name, hookExtraEnv))
}(i)
}

Expand All @@ -372,7 +372,7 @@ func backupFiles(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.Bac
}

// open the MANIFEST
wc, err := bh.AddFile(backupManifest)
wc, err := bh.AddFile(ctx, backupManifest)
if err != nil {
return fmt.Errorf("cannot add %v to backup: %v", backupManifest, err)
}
Expand Down Expand Up @@ -400,7 +400,7 @@ func backupFiles(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.Bac
}

// backupFile backs up an individual file.
func backupFile(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, fe *FileEntry, name string, hookExtraEnv map[string]string) (err error) {
func backupFile(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.BackupHandle, fe *FileEntry, name string, hookExtraEnv map[string]string) (err error) {
// Open the source file for reading.
var source *os.File
source, err = fe.open(mysqld.Cnf(), true)
Expand All @@ -410,7 +410,7 @@ func backupFile(mysqld MysqlDaemon, logger logutil.Logger, bh backupstorage.Back
defer source.Close()

// Open the destination file for writing, and a buffer.
wc, err := bh.AddFile(name)
wc, err := bh.AddFile(ctx, name)
if err != nil {
return fmt.Errorf("cannot add file: %v", err)
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func checkNoDB(ctx context.Context, mysqld MysqlDaemon, dbName string) (bool, er

// restoreFiles will copy all the files from the BackupStorage to the
// right place.
func restoreFiles(cnf *Mycnf, bh backupstorage.BackupHandle, fes []FileEntry, filter string, restoreConcurrency int, hookExtraEnv map[string]string) error {
func restoreFiles(ctx context.Context, cnf *Mycnf, bh backupstorage.BackupHandle, fes []FileEntry, filter string, restoreConcurrency int, hookExtraEnv map[string]string) error {
sema := sync2.NewSemaphore(restoreConcurrency, 0)
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}
Expand All @@ -545,18 +545,18 @@ func restoreFiles(cnf *Mycnf, bh backupstorage.BackupHandle, fes []FileEntry, fi

// And restore the file.
name := fmt.Sprintf("%v", i)
rec.RecordError(restoreFile(cnf, bh, &fes[i], filter, name, hookExtraEnv))
rec.RecordError(restoreFile(ctx, cnf, bh, &fes[i], filter, name, hookExtraEnv))
}(i)
}
wg.Wait()
return rec.Error()
}

// restoreFile restores an individual file.
func restoreFile(cnf *Mycnf, bh backupstorage.BackupHandle, fe *FileEntry, filter string, name string, hookExtraEnv map[string]string) (err error) {
func restoreFile(ctx context.Context, cnf *Mycnf, bh backupstorage.BackupHandle, fe *FileEntry, filter string, name string, hookExtraEnv map[string]string) (err error) {
// Open the source file for reading.
var source io.ReadCloser
source, err = bh.ReadFile(name)
source, err = bh.ReadFile(ctx, name)
if err != nil {
return err
}
Expand Down Expand Up @@ -717,7 +717,7 @@ func Restore(
}
defer bs.Close()

bhs, err := bs.ListBackups(dir)
bhs, err := bs.ListBackups(ctx, dir)
if err != nil {
return replication.Position{}, fmt.Errorf("ListBackups failed: %v", err)
}
Expand All @@ -736,7 +736,7 @@ func Restore(
var toRestore int
for toRestore = len(bhs) - 1; toRestore >= 0; toRestore-- {
bh = bhs[toRestore]
rc, err := bh.ReadFile(backupManifest)
rc, err := bh.ReadFile(ctx, backupManifest)
if err != nil {
log.Warningf("Possibly incomplete backup %v in directory %v on BackupStorage: can't read MANIFEST: %v)", bh.Name(), dir, err)
continue
Expand Down Expand Up @@ -792,7 +792,7 @@ func Restore(
}

logger.Infof("Restore: copying all files")
if err := restoreFiles(mysqld.Cnf(), bh, bm.FileEntries, bm.Filter, restoreConcurrency, hookExtraEnv); err != nil {
if err := restoreFiles(ctx, mysqld.Cnf(), bh, bm.FileEntries, bm.Filter, restoreConcurrency, hookExtraEnv); err != nil {
return replication.Position{}, err
}

Expand Down
31 changes: 19 additions & 12 deletions go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"flag"
"fmt"
"io"

"golang.org/x/net/context"
)

var (
Expand All @@ -33,21 +35,25 @@ type BackupHandle interface {
// characters and hyphens.
// It should be thread safe, it is possible to call AddFile in
// multiple go routines once a backup has been started.
AddFile(filename string) (io.WriteCloser, error)
// The context is valid for the duration of the writes, until the
// WriteCloser is closed.
AddFile(ctx context.Context, filename string) (io.WriteCloser, error)

// EndBackup stops and closes a backup. The contents should be kept.
// Only works for read-write backups (created by StartBackup).
EndBackup() error
EndBackup(ctx context.Context) error

// AbortBackup stops a backup, and removes the contents that
// have been copied already. It is called if an error occurs
// while the backup is being taken, and the backup cannot be finished.
// Only works for read-write backups (created by StartBackup).
AbortBackup() error
AbortBackup(ctx context.Context) error

// ReadFile starts reading a file from a backup.
// Only works for read-only backups (created by ListBackups).
ReadFile(filename string) (io.ReadCloser, error)
// The context is valid for the duration of the reads, until the
// ReadCloser is closed.
ReadFile(ctx context.Context, filename string) (io.ReadCloser, error)
}

// BackupStorage is the interface to the storage system
Expand All @@ -57,22 +63,23 @@ type BackupStorage interface {
// AddFile/EndBackup/AbortBackup cannot).
// The backups are string-sorted by Name(), ascending (ends up
// being the oldest backup first).
ListBackups(dir string) ([]BackupHandle, error)
ListBackups(ctx context.Context, dir string) ([]BackupHandle, error)

// StartBackup creates a new backup with the given name. If a
// backup with the same name already exists, it's an error.
// The returned backup is read-write
// (AddFile/EndBackup/AbortBackup cann all be called, not
// ReadFile)
StartBackup(dir, name string) (BackupHandle, error)
// (AddFile/EndBackup/AbortBackup can all be called, not
// ReadFile). The provided context is only valid for that
// function, and should not be remembered by the implementation.
StartBackup(ctx context.Context, dir, name string) (BackupHandle, error)

// RemoveBackup removes all the data associated with a backup.
// It will not appear in ListBackups after RemoveBackup succeeds.
RemoveBackup(dir, name string) error
RemoveBackup(ctx context.Context, dir, name string) error

// Close frees resources associated with an active backup session,
// such as closing connections. Implementations of BackupStorage must support
// being reused after Close() is called.
// Close frees resources associated with an active backup
// session, such as closing connections. Implementations of
// BackupStorage must support being reused after Close() is called.
Close() error
}

Expand Down
18 changes: 10 additions & 8 deletions go/vt/mysqlctl/cephbackupstorage/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

log "github.com/golang/glog"
minio "github.com/minio/minio-go"
"golang.org/x/net/context"

"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/mysqlctl/backupstorage"
)
Expand Down Expand Up @@ -55,7 +57,7 @@ func (bh *CephBackupHandle) Name() string {
}

// AddFile implements BackupHandle.
func (bh *CephBackupHandle) AddFile(filename string) (io.WriteCloser, error) {
func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) {
if bh.readOnly {
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
Expand Down Expand Up @@ -83,7 +85,7 @@ func (bh *CephBackupHandle) AddFile(filename string) (io.WriteCloser, error) {
}

// EndBackup implements BackupHandle.
func (bh *CephBackupHandle) EndBackup() error {
func (bh *CephBackupHandle) EndBackup(ctx context.Context) error {
if bh.readOnly {
return fmt.Errorf("EndBackup cannot be called on read-only backup")
}
Expand All @@ -93,15 +95,15 @@ func (bh *CephBackupHandle) EndBackup() error {
}

// AbortBackup implements BackupHandle.
func (bh *CephBackupHandle) AbortBackup() error {
func (bh *CephBackupHandle) AbortBackup(ctx context.Context) error {
if bh.readOnly {
return fmt.Errorf("AbortBackup cannot be called on read-only backup")
}
return bh.bs.RemoveBackup(bh.dir, bh.name)
return bh.bs.RemoveBackup(ctx, bh.dir, bh.name)
}

// ReadFile implements BackupHandle.
func (bh *CephBackupHandle) ReadFile(filename string) (io.ReadCloser, error) {
func (bh *CephBackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) {
if !bh.readOnly {
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
}
Expand All @@ -121,7 +123,7 @@ type CephBackupStorage struct {
}

// ListBackups implements BackupStorage.
func (bs *CephBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHandle, error) {
func (bs *CephBackupStorage) ListBackups(ctx context.Context, dir string) ([]backupstorage.BackupHandle, error) {
c, err := bs.client()
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,7 +166,7 @@ func (bs *CephBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHand
}

// StartBackup implements BackupStorage.
func (bs *CephBackupStorage) StartBackup(dir, name string) (backupstorage.BackupHandle, error) {
func (bs *CephBackupStorage) StartBackup(ctx context.Context, dir, name string) (backupstorage.BackupHandle, error) {
c, err := bs.client()
if err != nil {
return nil, err
Expand Down Expand Up @@ -192,7 +194,7 @@ func (bs *CephBackupStorage) StartBackup(dir, name string) (backupstorage.Backup
}

// RemoveBackup implements BackupStorage.
func (bs *CephBackupStorage) RemoveBackup(dir, name string) error {
func (bs *CephBackupStorage) RemoveBackup(ctx context.Context, dir, name string) error {
c, err := bs.client()
if err != nil {
return err
Expand Down
18 changes: 10 additions & 8 deletions go/vt/mysqlctl/filebackupstorage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"os"
"path"

"golang.org/x/net/context"

"github.com/youtube/vitess/go/vt/mysqlctl/backupstorage"
)

Expand Down Expand Up @@ -42,7 +44,7 @@ func (fbh *FileBackupHandle) Name() string {
}

// AddFile is part of the BackupHandle interface
func (fbh *FileBackupHandle) AddFile(filename string) (io.WriteCloser, error) {
func (fbh *FileBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) {
if fbh.readOnly {
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
Expand All @@ -51,23 +53,23 @@ func (fbh *FileBackupHandle) AddFile(filename string) (io.WriteCloser, error) {
}

// EndBackup is part of the BackupHandle interface
func (fbh *FileBackupHandle) EndBackup() error {
func (fbh *FileBackupHandle) EndBackup(ctx context.Context) error {
if fbh.readOnly {
return fmt.Errorf("EndBackup cannot be called on read-only backup")
}
return nil
}

// AbortBackup is part of the BackupHandle interface
func (fbh *FileBackupHandle) AbortBackup() error {
func (fbh *FileBackupHandle) AbortBackup(ctx context.Context) error {
if fbh.readOnly {
return fmt.Errorf("AbortBackup cannot be called on read-only backup")
}
return fbh.fbs.RemoveBackup(fbh.dir, fbh.name)
return fbh.fbs.RemoveBackup(ctx, fbh.dir, fbh.name)
}

// ReadFile is part of the BackupHandle interface
func (fbh *FileBackupHandle) ReadFile(filename string) (io.ReadCloser, error) {
func (fbh *FileBackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) {
if !fbh.readOnly {
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
}
Expand All @@ -79,7 +81,7 @@ func (fbh *FileBackupHandle) ReadFile(filename string) (io.ReadCloser, error) {
type FileBackupStorage struct{}

// ListBackups is part of the BackupStorage interface
func (fbs *FileBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHandle, error) {
func (fbs *FileBackupStorage) ListBackups(ctx context.Context, dir string) ([]backupstorage.BackupHandle, error) {
// ReadDir already sorts the results
p := path.Join(*FileBackupStorageRoot, dir)
fi, err := ioutil.ReadDir(p)
Expand Down Expand Up @@ -109,7 +111,7 @@ func (fbs *FileBackupStorage) ListBackups(dir string) ([]backupstorage.BackupHan
}

// StartBackup is part of the BackupStorage interface
func (fbs *FileBackupStorage) StartBackup(dir, name string) (backupstorage.BackupHandle, error) {
func (fbs *FileBackupStorage) StartBackup(ctx context.Context, dir, name string) (backupstorage.BackupHandle, error) {
// Make sure the directory exists.
p := path.Join(*FileBackupStorageRoot, dir)
if err := os.MkdirAll(p, os.ModePerm); err != nil {
Expand All @@ -131,7 +133,7 @@ func (fbs *FileBackupStorage) StartBackup(dir, name string) (backupstorage.Backu
}

// RemoveBackup is part of the BackupStorage interface
func (fbs *FileBackupStorage) RemoveBackup(dir, name string) error {
func (fbs *FileBackupStorage) RemoveBackup(ctx context.Context, dir, name string) error {
p := path.Join(*FileBackupStorageRoot, dir, name)
return os.RemoveAll(p)
}
Expand Down
Loading

0 comments on commit 9ccc238

Please sign in to comment.