diff --git a/client/lxd_storage_volumes.go b/client/lxd_storage_volumes.go index a12f6fbbc532..5367358ba388 100644 --- a/client/lxd_storage_volumes.go +++ b/client/lxd_storage_volumes.go @@ -555,8 +555,10 @@ func (r *ProtocolLXD) CopyStoragePoolVolume(pool string, source InstanceServer, return nil, fmt.Errorf("Failed to get destination connection info: %w", err) } + clusterInternalVolumeCopy := r.CheckExtension("cluster_internal_custom_volume_copy") == nil + // Copy the storage pool volume locally. - if destInfo.URL == sourceInfo.URL && destInfo.SocketPath == sourceInfo.SocketPath && (volume.Location == r.clusterTarget || (volume.Location == "none" && r.clusterTarget == "")) { + if destInfo.URL == sourceInfo.URL && destInfo.SocketPath == sourceInfo.SocketPath && (volume.Location == r.clusterTarget || (volume.Location == "none" && r.clusterTarget == "") || clusterInternalVolumeCopy) { // Project handling if destInfo.Project != sourceInfo.Project { if !r.HasExtension("storage_api_project") { @@ -566,6 +568,10 @@ func (r *ProtocolLXD) CopyStoragePoolVolume(pool string, source InstanceServer, req.Source.Project = sourceInfo.Project } + if clusterInternalVolumeCopy { + req.Source.Location = sourceInfo.Target + } + // Send the request op, _, err := r.queryOperation("POST", fmt.Sprintf("/storage-pools/%s/volumes/%s", url.PathEscape(pool), url.PathEscape(volume.Type)), req, "", true) if err != nil { diff --git a/doc/api-extensions.md b/doc/api-extensions.md index 79828ccaff29..26290cc41d35 100644 --- a/doc/api-extensions.md +++ b/doc/api-extensions.md @@ -2312,3 +2312,9 @@ no effect on existing devices. This API extension indicates that the `/1.0/operations/{id}/wait` endpoint exists on the server. This indicates to the client that the endpoint can be used to wait for an operation to complete rather than waiting for an operation event via the `/1.0/events` endpoint. + +## `cluster_internal_custom_volume_copy` + +This extension adds support for copying and moving custom storage volumes within a cluster with a single API call. +Calling `POST /1.0/storage-pools//custom?target=` will copy the custom volume specified in the `source` part of the request. +Calling `POST /1.0/storage-pools//custom/?target=` will move the custom volume from the source, specified in the `source` part of the request, to the target. diff --git a/doc/rest-api.yaml b/doc/rest-api.yaml index 27fa9695d81c..56b8f697d8c4 100644 --- a/doc/rest-api.yaml +++ b/doc/rest-api.yaml @@ -5827,6 +5827,8 @@ definitions: example: foo type: string x-go-name: Project + source: + $ref: '#/definitions/StorageVolumeSource' target: $ref: '#/definitions/StorageVolumePostTarget' volume_only: @@ -5981,6 +5983,11 @@ definitions: example: X509 PEM certificate type: string x-go-name: Certificate + location: + description: What cluster member this record was found on + example: lxd01 + type: string + x-go-name: Location mode: description: Whether to use pull or push mode (for migration) example: pull diff --git a/lxd/db/storage_volumes.go b/lxd/db/storage_volumes.go index 73e84b0793ef..1fe3942ffddd 100644 --- a/lxd/db/storage_volumes.go +++ b/lxd/db/storage_volumes.go @@ -953,3 +953,34 @@ func (c *ClusterTx) GetStorageVolumeURIs(ctx context.Context, project string) ([ return uris, nil } + +// UpdateStorageVolumeNode changes the name of a storage volume and the cluster member hosting it. +// It's meant to be used when moving a storage volume backed by ceph from one cluster node to another. +func (c *ClusterTx) UpdateStorageVolumeNode(ctx context.Context, projectName string, oldName string, newName string, newMemberName string, poolID int64, volumeType int) error { + volume, err := c.GetStoragePoolVolume(ctx, poolID, projectName, volumeType, oldName, false) + if err != nil { + return err + } + + member, err := c.GetNodeByName(ctx, newMemberName) + if err != nil { + return fmt.Errorf("Failed to get new member %q info: %w", newMemberName, err) + } + + stmt := "UPDATE storage_volumes SET node_id=?, name=? WHERE id=?" + result, err := c.tx.Exec(stmt, member.ID, newName, volume.ID) + if err != nil { + return fmt.Errorf("Failed to update volumes's name and member ID: %w", err) + } + + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("Failed to get rows affected by volume update: %w", err) + } + + if n != 1 { + return fmt.Errorf("Unexpected number of updated rows in storage_volumes table: %d", n) + } + + return nil +} diff --git a/lxd/storage_volumes.go b/lxd/storage_volumes.go index 865897c3bce8..da1c69625ec6 100644 --- a/lxd/storage_volumes.go +++ b/lxd/storage_volumes.go @@ -25,6 +25,7 @@ import ( lxdCluster "github.com/canonical/lxd/lxd/cluster" "github.com/canonical/lxd/lxd/db" "github.com/canonical/lxd/lxd/db/cluster" + dbCluster "github.com/canonical/lxd/lxd/db/cluster" "github.com/canonical/lxd/lxd/db/operationtype" "github.com/canonical/lxd/lxd/instance" "github.com/canonical/lxd/lxd/operations" @@ -641,6 +642,39 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) response.Response { return response.Conflict(fmt.Errorf("Volume by that name already exists")) } + target := queryParam(r, "target") + + // Check if we need to switch to migration + clustered, err := lxdCluster.Enabled(s.DB.Node) + if err != nil { + return response.SmartError(err) + } + + serverName := s.ServerName + var nodeAddress string + + if clustered && target != "" && !req.Source.Refresh && (req.Source.Location != "" && serverName != req.Source.Location) { + err := s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + nodeInfo, err := tx.GetNodeByName(ctx, req.Source.Location) + if err != nil { + return err + } + + nodeAddress = nodeInfo.Address + + return nil + }) + if err != nil { + return response.SmartError(err) + } + + if nodeAddress == "" { + return response.BadRequest(fmt.Errorf("The source is currently offline")) + } + + return clusterCopyCustomVolumeInternal(s, r, nodeAddress, projectName, poolName, &req) + } + switch req.Source.Type { case "": return doVolumeCreateOrCopy(s, r, request.ProjectParam(r), projectName, poolName, &req) @@ -657,6 +691,49 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) response.Response { } } +func clusterCopyCustomVolumeInternal(s *state.State, r *http.Request, sourceAddress string, projectName string, poolName string, req *api.StorageVolumesPost) response.Response { + websockets := map[string]string{} + + client, err := lxdCluster.Connect(sourceAddress, s.Endpoints.NetworkCert(), s.ServerCert(), r, false) + if err != nil { + return response.SmartError(err) + } + + client = client.UseProject(req.Source.Project) + + pullReq := api.StorageVolumePost{ + Name: req.Source.Name, + Pool: req.Source.Pool, + Migration: true, + VolumeOnly: req.Source.VolumeOnly, + Project: req.Source.Project, + Source: api.StorageVolumeSource{ + Location: req.Source.Location, + }, + } + + op, err := client.MigrateStoragePoolVolume(req.Source.Pool, pullReq) + if err != nil { + return response.SmartError(err) + } + + opAPI := op.Get() + + for k, v := range opAPI.Metadata { + websockets[k] = v.(string) + } + + // Reset the source for a migration + req.Source.Type = "migration" + req.Source.Certificate = string(s.Endpoints.NetworkCert().PublicKey()) + req.Source.Mode = "pull" + req.Source.Operation = fmt.Sprintf("https://%s/%s/operations/%s", sourceAddress, version.APIVersion, opAPI.ID) + req.Source.Websockets = websockets + req.Source.Project = "" + + return doVolumeMigration(s, r, req.Source.Project, projectName, poolName, req) +} + func doCustomVolumeRefresh(s *state.State, r *http.Request, requestProjectName string, projectName string, poolName string, req *api.StorageVolumesPost) response.Response { var run func(op *operations.Operation) error @@ -1075,6 +1152,148 @@ func storagePoolVolumePost(d *Daemon, r *http.Request) response.Response { r.Body = shared.BytesReadCloser{Buf: &buf} + target := queryParam(r, "target") + + // Check if clustered. + clustered, err := lxdCluster.Enabled(s.DB.Node) + if err != nil { + return response.InternalError(fmt.Errorf("Failed checking cluster state: %w", err)) + } + + if clustered && target != "" && req.Source.Location != "" && req.Migration { + var sourceNodeOffline bool + + err = s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + // Load source node. + nodeInfo, err := tx.GetNodeByName(ctx, req.Source.Location) + if err != nil { + return err + } + + sourceAddress := nodeInfo.Address + + if sourceAddress == "" { + // Local node. + sourceNodeOffline = false + return nil + } + + sourceMemberInfo, err := tx.GetNodeByAddress(ctx, sourceAddress) + if err != nil { + return fmt.Errorf("Failed to get source member for %q: %w", sourceAddress, err) + } + + sourceNodeOffline = sourceMemberInfo.IsOffline(s.GlobalConfig.OfflineThreshold()) + + return nil + }) + if err != nil { + return response.SmartError(err) + } + + var targetProject *api.Project + var targetMemberInfo *db.NodeInfo + + if sourceNodeOffline { + resp := forwardedResponseIfTargetIsRemote(s, r) + if resp != nil { + return resp + } + + srcPool, err := storagePools.LoadByName(s, srcPoolName) + if err != nil { + return response.SmartError(err) + } + + if srcPool.Driver().Info().Name == "ceph" { + // Load source volume. + srcPoolID, err := s.DB.Cluster.GetStoragePoolID(srcPoolName) + if err != nil { + return response.SmartError(err) + } + + var dbVolume *db.StorageVolume + + err = s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + dbVolume, err = tx.GetStoragePoolVolume(ctx, srcPoolID, projectName, db.StoragePoolVolumeTypeCustom, volumeName, true) + return err + }) + if err != nil { + // Check if the user provided an incorrect target query parameter and return a helpful error message. + _, volumeNotFound := api.StatusErrorMatch(err, http.StatusNotFound) + targetIsSet := r.URL.Query().Get("target") != "" + serverIsClustered, _ := lxdCluster.Enabled(s.DB.Node) + + if serverIsClustered && targetIsSet && volumeNotFound { + return response.NotFound(fmt.Errorf("Storage volume not found on this cluster member")) + } + + return response.SmartError(err) + } + + req := api.StorageVolumePost{ + Name: req.Name, + } + + return storagePoolVolumeTypePostRename(s, r, srcPool.Name(), projectName, &dbVolume.StorageVolume, req) + } + } else { + resp := forwardedResponseToNode(s, r, req.Source.Location) + if resp != nil { + return resp + } + } + + err = s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + p, err := dbCluster.GetProject(ctx, tx.Tx(), projectName) + if err != nil { + return err + } + + targetProject, err = p.ToAPI(ctx, tx.Tx()) + if err != nil { + return err + } + + allMembers, err := tx.GetNodes(ctx) + if err != nil { + return fmt.Errorf("Failed getting cluster members: %w", err) + } + + targetMemberInfo, _, err = project.CheckTarget(ctx, s.Authorizer, r, tx, targetProject, target, allMembers) + if err != nil { + return err + } + + if targetMemberInfo == nil { + return fmt.Errorf("Failed checking cluster member %q", target) + } + + return nil + }) + if err != nil { + return response.SmartError(err) + } + + if targetMemberInfo.IsOffline(s.GlobalConfig.OfflineThreshold()) { + return response.BadRequest(fmt.Errorf("Target cluster member is offline")) + } + + run := func(op *operations.Operation) error { + return migrateStorageVolume(s, r, volumeName, srcPoolName, targetMemberInfo.Name, targetProjectName, req, op) + } + + resources := map[string][]api.URL{} + resources["storage_volumes"] = []api.URL{*api.NewURL().Path(version.APIVersion, "storage-pools", srcPoolName, "volumes", "custom", volumeName)} + + op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.VolumeMigrate, resources, nil, run, nil, nil, r) + if err != nil { + return response.InternalError(err) + } + + return operations.OperationResponse(op) + } + resp := forwardedResponseIfTargetIsRemote(s, r) if resp != nil { return resp @@ -1086,9 +1305,12 @@ func storagePoolVolumePost(d *Daemon, r *http.Request) response.Response { return response.BadRequest(err) } - resp = forwardedResponseIfVolumeIsRemote(s, r, srcPoolName, projectName, volumeName, volumeType) - if resp != nil { - return resp + // If source is set, we know the source and the target, and therefore don't need this function to figure out where to forward the request to. + if req.Source.Location == "" { + resp = forwardedResponseIfVolumeIsRemote(s, r, srcPoolName, projectName, volumeName, volumeType) + if resp != nil { + return resp + } } // This is a migration request so send back requested secrets. @@ -1178,6 +1400,139 @@ func storagePoolVolumePost(d *Daemon, r *http.Request) response.Response { return storagePoolVolumeTypePostMove(s, r, srcPoolName, projectName, targetProjectName, &dbVolume.StorageVolume, req) } +func migrateStorageVolume(s *state.State, r *http.Request, sourceVolumeName string, sourcePoolName string, targetNode string, projectName string, req api.StorageVolumePost, op *operations.Operation) error { + if targetNode == req.Source.Location { + return fmt.Errorf("Target must be different than storage volumes' current location") + } + + var err error + var srcMember, newMember db.NodeInfo + + // If the source member is online then get its address so we can connect to it and see if the + // instance is running later. + err = s.DB.Cluster.Transaction(s.ShutdownCtx, func(ctx context.Context, tx *db.ClusterTx) error { + srcMember, err = tx.GetNodeByName(ctx, req.Source.Location) + if err != nil { + return fmt.Errorf("Failed getting current cluster member of storage volume %q", req.Source.Name) + } + + newMember, err = tx.GetNodeByName(ctx, targetNode) + if err != nil { + return fmt.Errorf("Failed loading new cluster member for storage volume: %w", err) + } + + return nil + }) + if err != nil { + return err + } + + srcPool, err := storagePools.LoadByName(s, sourcePoolName) + if err != nil { + return fmt.Errorf("Failed loading storage volume storage pool: %w", err) + } + + f, err := storageVolumePostClusteringMigrate(s, r, srcPool, projectName, sourceVolumeName, req.Pool, req.Project, req.Name, srcMember, newMember, req.VolumeOnly) + if err != nil { + return err + } + + return f(op) +} + +func storageVolumePostClusteringMigrate(s *state.State, r *http.Request, srcPool storagePools.Pool, srcProjectName string, srcVolumeName string, newPoolName string, newProjectName string, newVolumeName string, srcMember db.NodeInfo, newMember db.NodeInfo, volumeOnly bool) (func(op *operations.Operation) error, error) { + srcMemberOffline := srcMember.IsOffline(s.GlobalConfig.OfflineThreshold()) + + // Make sure that the source member is online if we end up being called from another member after a + // redirection due to the source member being offline. + if srcMemberOffline { + return nil, fmt.Errorf("The cluster member hosting the storage volume is offline") + } + + run := func(op *operations.Operation) error { + if newVolumeName == "" { + newVolumeName = srcVolumeName + } + + networkCert := s.Endpoints.NetworkCert() + + // Connect to the destination member, i.e. the member to migrate the custom volume to. + // Use the notify argument to indicate to the destination that we are moving a custom volume between + // cluster members. + dest, err := lxdCluster.Connect(newMember.Address, networkCert, s.ServerCert(), r, true) + if err != nil { + return fmt.Errorf("Failed to connect to destination server %q: %w", newMember.Address, err) + } + + dest = dest.UseTarget(newMember.Name).UseProject(srcProjectName) + + resources := map[string][]api.URL{} + resources["storage_volumes"] = []api.URL{*api.NewURL().Path(version.APIVersion, "storage-pools", srcPool.Name(), "volumes", "custom", srcVolumeName)} + + srcMigration, err := newStorageMigrationSource(volumeOnly, nil) + if err != nil { + return fmt.Errorf("Failed setting up storage volume migration on source: %w", err) + } + + run := func(op *operations.Operation) error { + err := srcMigration.DoStorage(s, srcProjectName, srcPool.Name(), srcVolumeName, op) + if err != nil { + return err + } + + err = srcPool.DeleteCustomVolume(srcProjectName, srcVolumeName, op) + if err != nil { + return err + } + + return nil + } + + cancel := func(op *operations.Operation) error { + srcMigration.disconnect() + return nil + } + + srcOp, err := operations.OperationCreate(s, srcProjectName, operations.OperationClassWebsocket, operationtype.VolumeMigrate, resources, srcMigration.Metadata(), run, cancel, srcMigration.Connect, r) + if err != nil { + return err + } + + err = srcOp.Start() + if err != nil { + return fmt.Errorf("Failed starting migration source operation: %w", err) + } + + sourceSecrets := make(map[string]string, len(srcMigration.conns)) + for connName, conn := range srcMigration.conns { + sourceSecrets[connName] = conn.Secret() + } + + // Request pull mode migration on destination. + err = dest.CreateStoragePoolVolume(newPoolName, api.StorageVolumesPost{ + Name: newVolumeName, + Type: "custom", + Source: api.StorageVolumeSource{ + Type: "migration", + Mode: "pull", + Operation: fmt.Sprintf("https://%s%s", srcMember.Address, srcOp.URL()), + Websockets: sourceSecrets, + Certificate: string(networkCert.PublicKey()), + Name: newVolumeName, + Pool: newPoolName, + Project: newProjectName, + }, + }) + if err != nil { + return fmt.Errorf("Failed requesting instance create on destination: %w", err) + } + + return nil + } + + return run, nil +} + // storagePoolVolumeTypePostMigration handles volume migration type POST requests. func storagePoolVolumeTypePostMigration(state *state.State, r *http.Request, requestProjectName string, projectName string, poolName string, volumeName string, req api.StorageVolumePost) response.Response { ws, err := newStorageMigrationSource(req.VolumeOnly, req.Target) diff --git a/shared/api/storage_pool_volume.go b/shared/api/storage_pool_volume.go index 51a62547b532..d7975aaafc3d 100644 --- a/shared/api/storage_pool_volume.go +++ b/shared/api/storage_pool_volume.go @@ -70,6 +70,11 @@ type StorageVolumePost struct { // // API extension: storage_volume_project_move Project string `json:"project,omitempty" yaml:"project,omitempty"` + + // Migration source + // + // API extension: cluster_internal_custom_volume_copy + Source StorageVolumeSource `json:"source" yaml:"source"` } // StorageVolumePostTarget represents the migration target host and operation @@ -231,6 +236,12 @@ type StorageVolumeSource struct { // // API extension: storage_api_project Project string `json:"project,omitempty" yaml:"project,omitempty"` + + // What cluster member this record was found on + // Example: lxd01 + // + // API extension: cluster_internal_custom_volume_copy + Location string `json:"location" yaml:"location"` } // Writable converts a full StorageVolume struct into a StorageVolumePut struct (filters read-only fields). diff --git a/shared/version/api.go b/shared/version/api.go index 22f3db0422e9..19becf109d4d 100644 --- a/shared/version/api.go +++ b/shared/version/api.go @@ -387,6 +387,7 @@ var APIExtensions = []string{ "instances_nic_limits_priority", "disk_initial_volume_configuration", "operation_wait", + "cluster_internal_custom_volume_copy", } // APIExtensionsCount returns the number of available API extensions.