Skip to content

Commit

Permalink
storage pools: implement rsync.bwlimit property
Browse files Browse the repository at this point in the history
When rsync has to be invoked to transfer storage entities (e.g. on local or
remote migration) there is currently no way to place an upper limit on the
amount of socket I/O allowed. This can lead to problems on systems with limited
ram available. Starting with this commit LXD allows setting rsync.bwlimit to
place an upper limit on the amount of socket I/O allowed. This property is
storage pool specific rather than a global daemon config key. In this way,
users can grant important pools more I/O than less important pools.

Closes #2678.

Signed-off-by: Christian Brauner <christian.brauner@ubuntu.com>
  • Loading branch information
Christian Brauner committed Apr 18, 2017
1 parent f6d142b commit 47f4509
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 69 deletions.
6 changes: 5 additions & 1 deletion doc/api-extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,8 @@ and the "source" attribute is added to specify the device on host.
If "source" is set without a "path", we should assume that "path" will be the same as "source".
If "path" is set without "source" and "major/minor" isn't set,
we should assume that "source" will be the same as "path".
So at least one of them must be set.
So at least one of them must be set.

## rsync.bwlimit
When rsync has to be invoked to transfer storage entities setting rsync.bwlimit
places an upper limit on the amount of socket I/O allowed.
1 change: 1 addition & 0 deletions doc/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ volume.block.mount\_options | string | block based driver (lvm)
lvm.thinpool\_name | string | lvm driver | LXDPool | Thin pool where images and containers are created.
lvm.use\_thinpool | bool | lvm driver | true | Whether the storage pool uses a thinpool for logical volumes.
lvm.vg\_name | string | lvm driver | name of the pool | Name of the volume group to create.
rsync.bwlimit | string | - | 0 (no limit) | Specifies the upper limit to be placed on the socket I/O whenever rsync has to be used to transfer storage entities.
volume.size | string | appropriate driver | 0 | Default volume size
volume.zfs.remove\_snapshots | bool | zfs driver | false | Remove snapshots as needed
volume.zfs.use\_refquota | bool | zfs driver | false | Use refquota instead of quota for space.
Expand Down
4 changes: 4 additions & 0 deletions doc/storage-backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ When such capabilities aren't available, either because the storage driver doesn
or because the storage backend of the source and target servers differ,
LXD will fallback to using rsync to transfer the individual files instead.

When rsync has to be used LXD allows to specify an upper limit on the amount of
socket I/O by setting the "rsync.bwlimit" storage pool property to a non-zero
value.

## Default storage pool
There is no concept of a default storage pool in LXD.
Instead, the pool to use for the container's root is treated as just another "disk" device in LXD.
Expand Down
1 change: 1 addition & 0 deletions lxd/api_1.0.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func api10Get(d *Daemon, r *http.Request) Response {
"storage_zfs_clone_copy",
"unix_device_rename",
"storage_lvm_use_thinpool",
"storage_rsync_bwlimit",
},
APIStatus: "stable",
APIVersion: version.APIVersion,
Expand Down
2 changes: 1 addition & 1 deletion lxd/db_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func dbUpdateFromV11(currentVersion int, version int, d *Daemon) error {
// containers/<container>/snapshots/<snap0>
// to
// snapshots/<container>/<snap0>
output, err := storageRsyncCopy(oldPath, newPath)
output, err := rsyncLocalCopy(oldPath, newPath, "")
if err != nil {
logger.Error(
"Failed rsync snapshot",
Expand Down
13 changes: 10 additions & 3 deletions lxd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,18 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
return err
}

bwlimit := ""
if *header.Fs != myType {
myType = MigrationFSType_RSYNC
header.Fs = &myType

driver, _ = rsyncMigrationSource(s.container, s.containerOnly)

// Check if this storage pool has a rate limit set for rsync.
poolwritable := s.container.Storage().GetStoragePoolWritable()
if poolwritable.Config != nil {
bwlimit = poolwritable.Config["rsync.bwlimit"]
}
}

// All failure paths need to do a few things to correctly handle errors before returning.
Expand All @@ -394,7 +401,7 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
return err
}

err = driver.SendWhileRunning(s.fsConn, migrateOp)
err = driver.SendWhileRunning(s.fsConn, migrateOp, bwlimit)
if err != nil {
return abort(err)
}
Expand Down Expand Up @@ -515,12 +522,12 @@ func (s *migrationSourceWs) Do(migrateOp *operation) error {
* no reason to do these in parallel. In the future when we're using
* p.haul's protocol, it will make sense to do these in parallel.
*/
err = RsyncSend(shared.AddSlash(checkpointDir), s.criuConn, nil)
err = RsyncSend(shared.AddSlash(checkpointDir), s.criuConn, nil, bwlimit)
if err != nil {
return abort(err)
}

err = driver.SendAfterCheckpoint(s.fsConn)
err = driver.SendAfterCheckpoint(s.fsConn, bwlimit)
if err != nil {
return abort(err)
}
Expand Down
12 changes: 6 additions & 6 deletions lxd/patches.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func upgradeFromStorageTypeBtrfs(name string, d *Daemon, defaultPoolName string,
return err
}

output, err := storageRsyncCopy(oldContainerMntPoint, newContainerMntPoint)
output, err := rsyncLocalCopy(oldContainerMntPoint, newContainerMntPoint, "")
if err != nil {
logger.Errorf("Failed to rsync: %s: %s.", output, err)
return err
Expand Down Expand Up @@ -480,7 +480,7 @@ func upgradeFromStorageTypeBtrfs(name string, d *Daemon, defaultPoolName string,
return err
}

output, err := storageRsyncCopy(oldSnapshotMntPoint, newSnapshotMntPoint)
output, err := rsyncLocalCopy(oldSnapshotMntPoint, newSnapshotMntPoint, "")
if err != nil {
logger.Errorf("Failed to rsync: %s: %s.", output, err)
return err
Expand Down Expand Up @@ -684,7 +684,7 @@ func upgradeFromStorageTypeDir(name string, d *Daemon, defaultPoolName string, d
// First try to rename.
err := os.Rename(oldContainerMntPoint, newContainerMntPoint)
if err != nil {
output, err := storageRsyncCopy(oldContainerMntPoint, newContainerMntPoint)
output, err := rsyncLocalCopy(oldContainerMntPoint, newContainerMntPoint, "")
if err != nil {
logger.Errorf("Failed to rsync: %s: %s.", output, err)
return err
Expand Down Expand Up @@ -731,7 +731,7 @@ func upgradeFromStorageTypeDir(name string, d *Daemon, defaultPoolName string, d
if shared.PathExists(oldSnapshotMntPoint) && !shared.PathExists(newSnapshotMntPoint) {
err := os.Rename(oldSnapshotMntPoint, newSnapshotMntPoint)
if err != nil {
output, err := storageRsyncCopy(oldSnapshotMntPoint, newSnapshotMntPoint)
output, err := rsyncLocalCopy(oldSnapshotMntPoint, newSnapshotMntPoint, "")
if err != nil {
logger.Errorf("Failed to rsync: %s: %s.", output, err)
return err
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func upgradeFromStorageTypeLvm(name string, d *Daemon, defaultPoolName string, d
}

// Use rsync to fill the empty volume.
output, err := storageRsyncCopy(oldContainerMntPoint, newContainerMntPoint)
output, err := rsyncLocalCopy(oldContainerMntPoint, newContainerMntPoint, "")
if err != nil {
ctStorage.ContainerDelete(ctStruct)
return fmt.Errorf("rsync failed: %s", string(output))
Expand Down Expand Up @@ -1211,7 +1211,7 @@ func upgradeFromStorageTypeLvm(name string, d *Daemon, defaultPoolName string, d
}

// Use rsync to fill the empty volume.
output, err := storageRsyncCopy(oldSnapshotMntPoint, newSnapshotMntPoint)
output, err := rsyncLocalCopy(oldSnapshotMntPoint, newSnapshotMntPoint, "")
if err != nil {
csStorage.ContainerDelete(csStruct)
return fmt.Errorf("rsync failed: %s", string(output))
Expand Down
46 changes: 40 additions & 6 deletions lxd/rsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,36 @@ import (
"github.com/lxc/lxd/shared/logger"
)

func rsyncSendSetup(path string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {
// rsyncCopy copies a directory using rsync (with the --devices option).
func rsyncLocalCopy(source string, dest string, bwlimit string) (string, error) {
err := os.MkdirAll(dest, 0755)
if err != nil {
return "", err
}

rsyncVerbosity := "-q"
if debug {
rsyncVerbosity = "-vi"
}

if bwlimit == "" {
bwlimit = "0"
}

return shared.RunCommand("rsync",
"-a",
"-HAX",
"--devices",
"--delete",
"--checksum",
"--numeric-ids",
"--bwlimit", bwlimit,
rsyncVerbosity,
shared.AddSlash(source),
dest)
}

func rsyncSendSetup(path string, bwlimit string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {
/*
* It's sort of unfortunate, but there's no library call to get a
* temporary name, so we get the file and close it and use its name.
Expand Down Expand Up @@ -57,16 +86,21 @@ func rsyncSendSetup(path string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {
* hardcoding that at the other end, so we can just ignore it.
*/
rsyncCmd := fmt.Sprintf("sh -c \"%s netcat %s\"", execPath, f.Name())
cmd := exec.Command(
"rsync",
if bwlimit == "" {
bwlimit = "0"
}

cmd := exec.Command("rsync",
"-arvP",
"--devices",
"--numeric-ids",
"--partial",
path,
"localhost:/tmp/foo",
"-e",
rsyncCmd)
rsyncCmd,
"--bwlimit",
bwlimit)

stderr, err := cmd.StderrPipe()
if err != nil {
Expand All @@ -90,8 +124,8 @@ func rsyncSendSetup(path string) (*exec.Cmd, net.Conn, io.ReadCloser, error) {

// RsyncSend sets up the sending half of an rsync, to recursively send the
// directory pointed to by path over the websocket.
func RsyncSend(path string, conn *websocket.Conn, readWrapper func(io.ReadCloser) io.ReadCloser) error {
cmd, dataSocket, stderr, err := rsyncSendSetup(path)
func RsyncSend(path string, conn *websocket.Conn, readWrapper func(io.ReadCloser) io.ReadCloser, bwlimit string) error {
cmd, dataSocket, stderr, err := rsyncSendSetup(path, bwlimit)
if err != nil {
return err
}
Expand Down
27 changes: 0 additions & 27 deletions lxd/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,33 +112,6 @@ func filesystemDetect(path string) (string, error) {
}
}

// storageRsyncCopy copies a directory using rsync (with the --devices option).
func storageRsyncCopy(source string, dest string) (string, error) {
err := os.MkdirAll(dest, 0755)
if err != nil {
return "", err
}

rsyncVerbosity := "-q"
if debug {
rsyncVerbosity = "-vi"
}

output, err := shared.RunCommand(
"rsync",
"-a",
"-HAX",
"--devices",
"--delete",
"--checksum",
"--numeric-ids",
rsyncVerbosity,
shared.AddSlash(source),
dest)

return output, err
}

// storageType defines the type of a storage
type storageType int

Expand Down
13 changes: 9 additions & 4 deletions lxd/storage_btrfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,11 @@ func (s *storageBtrfs) StoragePoolUmount() (bool, error) {
}

func (s *storageBtrfs) StoragePoolUpdate(writable *api.StoragePoolPut, changedConfig []string) error {
return fmt.Errorf("Btrfs storage properties cannot be changed.")
if shared.StringInSlice("rsync.bwlimit", changedConfig) {
return nil
}

return fmt.Errorf("Storage property cannot be changed.")
}

func (s *storageBtrfs) GetStoragePoolWritable() api.StoragePoolPut {
Expand Down Expand Up @@ -1005,7 +1009,8 @@ func (s *storageBtrfs) ContainerRestore(container container, sourceContainer con
if err == nil {
// Use rsync to fill the empty volume. Sync by using
// the subvolume name.
output, err := storageRsyncCopy(sourceContainerSubvolumeName, targetContainerSubvolumeName)
bwlimit := s.pool.Config["rsync.bwlimit"]
output, err := rsyncLocalCopy(sourceContainerSubvolumeName, targetContainerSubvolumeName, bwlimit)
if err != nil {
s.ContainerDelete(container)
logger.Errorf("ContainerRestore: rsync failed: %s.", string(output))
Expand Down Expand Up @@ -1743,7 +1748,7 @@ func (s *btrfsMigrationSourceDriver) send(conn *websocket.Conn, btrfsPath string
return err
}

func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation) error {
func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *operation, bwlimit string) error {
_, containerPool := s.container.Storage().GetContainerPoolInfo()
containerName := s.container.Name()
containersPath := getContainerMountPoint(containerPool, "")
Expand Down Expand Up @@ -1821,7 +1826,7 @@ func (s *btrfsMigrationSourceDriver) SendWhileRunning(conn *websocket.Conn, op *
return s.send(conn, migrationSendSnapshot, btrfsParent, wrapper)
}

func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn) error {
func (s *btrfsMigrationSourceDriver) SendAfterCheckpoint(conn *websocket.Conn, bwlimit string) error {
tmpPath := containerPath(fmt.Sprintf("%s/.migration-send", s.container.Name()), true)
err := os.MkdirAll(tmpPath, 0700)
if err != nil {
Expand Down
24 changes: 16 additions & 8 deletions lxd/storage_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ func (s *storageDir) GetContainerPoolInfo() (int64, string) {
}

func (s *storageDir) StoragePoolUpdate(writable *api.StoragePoolPut, changedConfig []string) error {
return fmt.Errorf("Dir storage properties cannot be changed.")
if shared.StringInSlice("rsync.bwlimit", changedConfig) {
return nil
}

return fmt.Errorf("Storage property cannot be changed.")
}

// Functions dealing with storage pools.
Expand Down Expand Up @@ -367,7 +371,8 @@ func (s *storageDir) copyContainer(target container, source container) error {
return err
}

output, err := storageRsyncCopy(sourceContainerMntPoint, targetContainerMntPoint)
bwlimit := s.pool.Config["rsync.bwlimit"]
output, err := rsyncLocalCopy(sourceContainerMntPoint, targetContainerMntPoint, bwlimit)
if err != nil {
return fmt.Errorf("Failed to rsync container: %s: %s.", string(output), err)
}
Expand Down Expand Up @@ -400,7 +405,8 @@ func (s *storageDir) copySnapshot(target container, source container) error {
return err
}

output, err := storageRsyncCopy(sourceContainerMntPoint, targetContainerMntPoint)
bwlimit := s.pool.Config["rsync.bwlimit"]
output, err := rsyncLocalCopy(sourceContainerMntPoint, targetContainerMntPoint, bwlimit)
if err != nil {
return fmt.Errorf("Failed to rsync container: %s: %s.", string(output), err)
}
Expand Down Expand Up @@ -533,7 +539,8 @@ func (s *storageDir) ContainerRestore(container container, sourceContainer conta
sourcePath := sourceContainer.Path()

// Restore using rsync
output, err := storageRsyncCopy(sourcePath, targetPath)
bwlimit := s.pool.Config["rsync.bwlimit"]
output, err := rsyncLocalCopy(sourcePath, targetPath, bwlimit)
if err != nil {
return fmt.Errorf("Failed to rsync container: %s: %s.", string(output), err)
}
Expand Down Expand Up @@ -566,8 +573,8 @@ func (s *storageDir) ContainerSnapshotCreate(snapshotContainer container, source
return err
}

rsync := func(snapshotContainer container, oldPath string, newPath string) error {
output, err := storageRsyncCopy(oldPath, newPath)
rsync := func(snapshotContainer container, oldPath string, newPath string, bwlimit string) error {
output, err := rsyncLocalCopy(oldPath, newPath, bwlimit)
if err != nil {
s.ContainerDelete(snapshotContainer)
return fmt.Errorf("Failed to rsync: %s: %s.", string(output), err)
Expand All @@ -586,7 +593,8 @@ func (s *storageDir) ContainerSnapshotCreate(snapshotContainer container, source
_, sourcePool := sourceContainer.Storage().GetContainerPoolInfo()
sourceContainerName := sourceContainer.Name()
sourceContainerMntPoint := getContainerMountPoint(sourcePool, sourceContainerName)
err = rsync(snapshotContainer, sourceContainerMntPoint, targetContainerMntPoint)
bwlimit := s.pool.Config["rsync.bwlimit"]
err = rsync(snapshotContainer, sourceContainerMntPoint, targetContainerMntPoint, bwlimit)
if err != nil {
return err
}
Expand All @@ -602,7 +610,7 @@ func (s *storageDir) ContainerSnapshotCreate(snapshotContainer container, source
return nil
}

err = rsync(snapshotContainer, sourceContainerMntPoint, targetContainerMntPoint)
err = rsync(snapshotContainer, sourceContainerMntPoint, targetContainerMntPoint, bwlimit)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 47f4509

Please sign in to comment.