Skip to content

Commit

Permalink
Merge pull request #13198 from roosterfish/rbd_refresh_revert
Browse files Browse the repository at this point in the history
Storage: Use reverter for Ceph RBD volume refresh
  • Loading branch information
tomponline authored Mar 25, 2024
2 parents 5f23ba3 + 084e441 commit 24fb895
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 94 deletions.
3 changes: 2 additions & 1 deletion lxd/storage/drivers/driver_btrfs_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,8 @@ func (d *btrfs) CreateVolumeFromCopy(vol VolumeCopy, srcVol VolumeCopy, allowInc
func (d *btrfs) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, preFiller *VolumeFiller, op *operations.Operation) error {
// Handle simple rsync and block_and_rsync through generic.
if volTargetArgs.MigrationType.FSType == migration.MigrationFSType_RSYNC || volTargetArgs.MigrationType.FSType == migration.MigrationFSType_BLOCK_AND_RSYNC {
return genericVFSCreateVolumeFromMigration(d, nil, vol, conn, volTargetArgs, preFiller, op)
_, err := genericVFSCreateVolumeFromMigration(d, nil, vol, conn, volTargetArgs, preFiller, op)
return err
} else if volTargetArgs.MigrationType.FSType != migration.MigrationFSType_BTRFS {
return ErrNotSupported
}
Expand Down
203 changes: 135 additions & 68 deletions lxd/storage/drivers/driver_ceph_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,46 +520,13 @@ func (d *ceph) CreateVolumeFromCopy(vol VolumeCopy, srcVol VolumeCopy, allowInco
}

// CreateVolumeFromMigration creates a volume being sent via a migration.
func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, preFiller *VolumeFiller, op *operations.Operation) error {
if volTargetArgs.ClusterMoveSourceName != "" {
err := vol.EnsureMountPath()
if err != nil {
return err
}

if vol.IsVMBlock() {
fsVol := NewVolumeCopy(vol.NewVMBlockFilesystemVolume())
err := d.CreateVolumeFromMigration(fsVol, conn, volTargetArgs, preFiller, op)
if err != nil {
return err
}
}

return nil
}

// It returns the cleanup hooks required to revert any changes made during the migration.
func (d *ceph) createVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, preFiller *VolumeFiller, op *operations.Operation) (revert.Hook, error) {
// Handle simple rsync and block_and_rsync through generic.
if shared.ValueInSlice(volTargetArgs.MigrationType.FSType, []migration.MigrationFSType{migration.MigrationFSType_RSYNC, migration.MigrationFSType_BLOCK_AND_RSYNC}) || volTargetArgs.MigrationType.FSType == migration.MigrationFSType_RBD_AND_RSYNC && vol.contentType == ContentTypeFS {
return genericVFSCreateVolumeFromMigration(d, nil, vol, conn, volTargetArgs, preFiller, op)
} else if !shared.ValueInSlice(volTargetArgs.MigrationType.FSType, []migration.MigrationFSType{migration.MigrationFSType_RBD, migration.MigrationFSType_RBD_AND_RSYNC}) {
return ErrNotSupported
}

// Migrate (receive) the VMs filesystem volume too.
// This will recursively call this function again and fall back to the generic way of refreshing.
if vol.IsVMBlock() {
// Ensure that the volume's snapshots are also replaced with their filesystem counterpart.
fsVolSnapshots := make([]Volume, 0, len(vol.Snapshots))
for _, snapshot := range vol.Snapshots {
fsVolSnapshots = append(fsVolSnapshots, snapshot.NewVMBlockFilesystemVolume())
}

fsVolCopy := NewVolumeCopy(vol.NewVMBlockFilesystemVolume(), fsVolSnapshots...)

err := d.CreateVolumeFromMigration(fsVolCopy, conn, volTargetArgs, preFiller, op)
if err != nil {
return err
}
return nil, ErrNotSupported
}

var lastCommonSnapshotName string
Expand All @@ -585,7 +552,7 @@ func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser
if lastCommonSnapshotFound {
ok, err := d.hasVolume(d.getRBDVolumeName(vol.Volume, fmt.Sprintf("snapshot_%s", targetSnapshotName), false, false))
if err != nil {
return err
return nil, err
}

// The snapshot does not exist on the target.
Expand All @@ -597,7 +564,7 @@ func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser
// This happens if not the latest snapshot on the target side gets deleted and requires refresh.
_, err = d.deleteVolumeSnapshot(vol.Volume, fmt.Sprintf("snapshot_%s", targetSnapshotName))
if err != nil {
return err
return nil, err
}
}
}
Expand All @@ -608,7 +575,7 @@ func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser
// between the latest snapshot and source volume and apply it on the target volume.
err := d.restoreVolume(vol.Volume, vol.Snapshots[lastCommonSnapshotIndex], op)
if err != nil {
return err
return nil, err
}
} else {
// In case of refresh first delete the already existing volume.
Expand All @@ -617,7 +584,7 @@ func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser
// Delete the volume as we will create a new sparse copy.
_, err := d.deleteVolume(vol.Volume)
if err != nil {
return err
return nil, err
}
}

Expand All @@ -627,15 +594,18 @@ func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser
// if the volume is of type snapshot, it will get recreated later from copy.
err := d.rbdCreateVolume(vol.Volume, vol.ConfigSize())
if err != nil {
return err
return nil, err
}
}

err := vol.Volume.EnsureMountPath()
if err != nil {
return err
return nil, err
}

revert := revert.New()
defer revert.Fail()

targetVolumeName := d.getRBDVolumeName(vol.Volume, "", false, true)

if len(volTargetArgs.Snapshots) > 0 {
Expand All @@ -661,18 +631,22 @@ func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser

err := d.receiveVolume(targetVolumeName, conn, wrapper)
if err != nil {
return err
return nil, err
}

snapVol, err := vol.NewSnapshot(targetSnapshotName)
if err != nil {
return err
return nil, err
}

err = snapVol.EnsureMountPath()
if err != nil {
return err
return nil, err
}

// Ensure to cleanup the snapshot on the target volume in case of error.
// When retrying the migration there shouldn't be any left over snapshot from before.
revert.Add(func() { _, _ = d.deleteVolumeSnapshot(vol.Volume, fmt.Sprintf("snapshot_%s", targetSnapshotName)) })
}
}

Expand All @@ -695,37 +669,77 @@ func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser
wrapper := migration.ProgressWriter(op, "fs_progress", vol.name)

// Apply the diff.
return d.receiveVolume(targetVolumeName, conn, wrapper)
err = d.receiveVolume(targetVolumeName, conn, wrapper)
if err != nil {
return nil, err
}

cleanup := revert.Clone().Fail
revert.Success()
return cleanup, nil
}

// RefreshVolume updates an existing volume to match the state of another.
func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots []string, allowInconsistent bool, op *operations.Operation) error {
// Copy volumes with content type filesystem using the generic approach.
if vol.contentType == ContentTypeFS {
return genericVFSCopyVolume(d, nil, vol, srcVol, refreshSnapshots, true, allowInconsistent, op)
}
// CreateVolumeFromMigration creates a volume being sent via a migration.
func (d *ceph) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, preFiller *VolumeFiller, op *operations.Operation) error {
if volTargetArgs.ClusterMoveSourceName != "" {
err := vol.EnsureMountPath()
if err != nil {
return err
}

// Refresh the VMs filesystem volume too.
// This will recursively call this function again and fall back to the generic way of refreshing.
if srcVol.IsVMBlock() {
// Ensure that the volume's snapshots are also replaced with their filesystem counterpart.
srcFsVolSnapshots := make([]Volume, 0, len(srcVol.Snapshots))
for _, snapshot := range srcVol.Snapshots {
srcFsVolSnapshots = append(srcFsVolSnapshots, snapshot.NewVMBlockFilesystemVolume())
if vol.IsVMBlock() {
fsVol := NewVolumeCopy(vol.NewVMBlockFilesystemVolume())
err := d.CreateVolumeFromMigration(fsVol, conn, volTargetArgs, preFiller, op)
if err != nil {
return err
}
}

return nil
}

revert := revert.New()
defer revert.Fail()

// Migrate (receive) the VMs filesystem volume too.
// This will fall back to the generic way of refreshing.
if vol.IsVMBlock() {
// Ensure that the volume's snapshots are also replaced with their filesystem counterpart.
fsVolSnapshots := make([]Volume, 0, len(vol.Snapshots))
for _, snapshot := range vol.Snapshots {
fsVolSnapshots = append(fsVolSnapshots, snapshot.NewVMBlockFilesystemVolume())
}

srcFsVolCopy := NewVolumeCopy(srcVol.NewVMBlockFilesystemVolume(), srcFsVolSnapshots...)
fsVolCopy := NewVolumeCopy(vol.NewVMBlockFilesystemVolume(), fsVolSnapshots...)

err := d.RefreshVolume(fsVolCopy, srcFsVolCopy, refreshSnapshots, allowInconsistent, op)
// Migrate the VMs filesystem volume and record the cleanup hooks.
// This allows cleaning up any changes made during the generic migration.
cleanup, err := d.createVolumeFromMigration(fsVolCopy, conn, volTargetArgs, preFiller, op)
if err != nil {
return err
}

revert.Add(cleanup)
}

// Migrate the actual volume and record the cleanup hooks.
cleanup, err := d.createVolumeFromMigration(vol, conn, volTargetArgs, preFiller, op)
if err != nil {
return err
}

revert.Add(cleanup)

revert.Success()
return nil
}

// refreshVolume updates an existing volume to match the state of another.
// It returns the cleanup hooks required to revert any changes made during the refresh.
func (d *ceph) refreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots []string, allowInconsistent bool, op *operations.Operation) (revert.Hook, error) {
// Copy volumes with content type filesystem using the generic approach.
if vol.contentType == ContentTypeFS {
return genericVFSCopyVolume(d, nil, vol, srcVol, refreshSnapshots, true, allowInconsistent, op)
}

var lastCommonSnapshotName string
Expand All @@ -751,7 +765,7 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
if lastCommonSnapshotFound {
ok, err := d.hasVolume(d.getRBDVolumeName(vol.Volume, fmt.Sprintf("snapshot_%s", targetSnapshotName), false, false))
if err != nil {
return err
return nil, err
}

// The snapshot does not exist on the target.
Expand All @@ -765,7 +779,7 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
// It already got refreshed using the generic approach.
_, err = d.deleteVolumeSnapshot(vol.Volume, fmt.Sprintf("snapshot_%s", targetSnapshotName))
if err != nil {
return err
return nil, err
}
}
}
Expand All @@ -778,7 +792,7 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
// It already got refreshed using the generic approach.
err := d.restoreVolume(vol.Volume, vol.Snapshots[lastCommonSnapshotIndex], op)
if err != nil {
return err
return nil, err
}
} else {
// There isn't a common snapshot on the target volume.
Expand All @@ -787,7 +801,7 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
// It already got refreshed using the generic approach.
_, err := d.deleteVolume(vol.Volume)
if err != nil {
return err
return nil, err
}

// Recreate the volume.
Expand All @@ -799,7 +813,7 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
// It already got refreshed using the generic approach.
err := d.rbdCreateVolume(vol.Volume, vol.ConfigSize())
if err != nil {
return err
return nil, err
}
}
}
Expand All @@ -808,7 +822,7 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
// The target volume was just deleted in the step before
// as there isn't any common snapshot when refreshing a volume from a snapshot.
// Simply copy the source volume again to the target.
return d.CreateVolumeFromCopy(vol, srcVol, allowInconsistent, op)
return nil, d.CreateVolumeFromCopy(vol, srcVol, allowInconsistent, op)
}

// Refreshes the targetVol by applying the sourceVol.
Expand All @@ -827,6 +841,9 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
return d.copyVolumeDiff(fullSourceSnapName, fullTargetVolName, sourceParentSnap)
}

revert := revert.New()
defer revert.Fail()

var lastSnap string

// Create all missing snapshots on the target using an incremental stream.
Expand Down Expand Up @@ -864,8 +881,12 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots

err := refresh(sourceSnapshot, vol.Volume, sourceParentSnapshotName)
if err != nil {
return err
return nil, err
}

// Ensure to cleanup the snapshot on the target volume in case of error.
// When retrying the refresh there shouldn't be any left over snapshot from before.
revert.Add(func() { _, _ = d.deleteVolumeSnapshot(vol.Volume, fmt.Sprintf("snapshot_%s", sourceSnapshotName)) })
}
}

Expand All @@ -882,10 +903,56 @@ func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots
// Apply the diff on the target volume.
// If commonSnap is set only the diff from the last common snapshot gets refreshed.
err := refresh(srcVol.Volume, vol.Volume, lastSnap)
if err != nil {
return nil, err
}

cleanup := revert.Clone().Fail
revert.Success()
return cleanup, nil
}

// RefreshVolume updates an existing volume to match the state of another.
func (d *ceph) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots []string, allowInconsistent bool, op *operations.Operation) error {
revert := revert.New()
defer revert.Fail()

// Refresh the VMs filesystem volume too.
// This will fall back to the generic way of refreshing.
if srcVol.IsVMBlock() {
// Ensure that the volume's snapshots are also replaced with their filesystem counterpart.
srcFsVolSnapshots := make([]Volume, 0, len(srcVol.Snapshots))
for _, snapshot := range srcVol.Snapshots {
srcFsVolSnapshots = append(srcFsVolSnapshots, snapshot.NewVMBlockFilesystemVolume())
}

fsVolSnapshots := make([]Volume, 0, len(vol.Snapshots))
for _, snapshot := range vol.Snapshots {
fsVolSnapshots = append(fsVolSnapshots, snapshot.NewVMBlockFilesystemVolume())
}

srcFsVolCopy := NewVolumeCopy(srcVol.NewVMBlockFilesystemVolume(), srcFsVolSnapshots...)
fsVolCopy := NewVolumeCopy(vol.NewVMBlockFilesystemVolume(), fsVolSnapshots...)

// Refresh the VMs filesystem volume and record the cleanup hooks.
// This allows cleaning up any changes made during the generic refresh.
cleanup, err := d.refreshVolume(fsVolCopy, srcFsVolCopy, refreshSnapshots, allowInconsistent, op)
if err != nil {
return err
}

revert.Add(cleanup)
}

// Refresh the actual volume and record the cleanup hooks.
cleanup, err := d.refreshVolume(vol, srcVol, refreshSnapshots, allowInconsistent, op)
if err != nil {
return err
}

revert.Add(cleanup)

revert.Success()
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions lxd/storage/drivers/driver_dir_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,20 @@ func (d *dir) CreateVolumeFromCopy(vol VolumeCopy, srcVol VolumeCopy, allowIncon
}

// Run the generic copy.
return genericVFSCopyVolume(d, d.setupInitialQuota, vol, srcVol, srcSnapshots, false, allowInconsistent, op)
_, err := genericVFSCopyVolume(d, d.setupInitialQuota, vol, srcVol, srcSnapshots, false, allowInconsistent, op)
return err
}

// CreateVolumeFromMigration creates a volume being sent via a migration.
func (d *dir) CreateVolumeFromMigration(vol VolumeCopy, conn io.ReadWriteCloser, volTargetArgs migration.VolumeTargetArgs, preFiller *VolumeFiller, op *operations.Operation) error {
return genericVFSCreateVolumeFromMigration(d, d.setupInitialQuota, vol, conn, volTargetArgs, preFiller, op)
_, err := genericVFSCreateVolumeFromMigration(d, d.setupInitialQuota, vol, conn, volTargetArgs, preFiller, op)
return err
}

// RefreshVolume provides same-pool volume and specific snapshots syncing functionality.
func (d *dir) RefreshVolume(vol VolumeCopy, srcVol VolumeCopy, refreshSnapshots []string, allowInconsistent bool, op *operations.Operation) error {
return genericVFSCopyVolume(d, d.setupInitialQuota, vol, srcVol, refreshSnapshots, true, allowInconsistent, op)
_, err := genericVFSCopyVolume(d, d.setupInitialQuota, vol, srcVol, refreshSnapshots, true, allowInconsistent, op)
return err
}

// DeleteVolume deletes a volume of the storage device. If any snapshots of the volume remain then
Expand Down
Loading

0 comments on commit 24fb895

Please sign in to comment.