From dc578c808b8cb23801a13a17498c354e32c73057 Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Thu, 21 Sep 2023 09:33:24 +0200 Subject: [PATCH 1/8] api: Add cluster_internal_custom_volume_copy Signed-off-by: Thomas Hipp --- doc/api-extensions.md | 6 ++++++ shared/version/api.go | 1 + 2 files changed, 7 insertions(+) diff --git a/doc/api-extensions.md b/doc/api-extensions.md index 79828ccaff29..26290cc41d35 100644 --- a/doc/api-extensions.md +++ b/doc/api-extensions.md @@ -2312,3 +2312,9 @@ no effect on existing devices. This API extension indicates that the `/1.0/operations/{id}/wait` endpoint exists on the server. This indicates to the client that the endpoint can be used to wait for an operation to complete rather than waiting for an operation event via the `/1.0/events` endpoint. + +## `cluster_internal_custom_volume_copy` + +This extension adds support for copying and moving custom storage volumes within a cluster with a single API call. +Calling `POST /1.0/storage-pools//custom?target=` will copy the custom volume specified in the `source` part of the request. +Calling `POST /1.0/storage-pools//custom/?target=` will move the custom volume from the source, specified in the `source` part of the request, to the target. diff --git a/shared/version/api.go b/shared/version/api.go index 22f3db0422e9..19becf109d4d 100644 --- a/shared/version/api.go +++ b/shared/version/api.go @@ -387,6 +387,7 @@ var APIExtensions = []string{ "instances_nic_limits_priority", "disk_initial_volume_configuration", "operation_wait", + "cluster_internal_custom_volume_copy", } // APIExtensionsCount returns the number of available API extensions. From 62997e47ba79b9705b315bc6977106744bf94948 Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Thu, 21 Sep 2023 09:46:23 +0200 Subject: [PATCH 2/8] shared/api: Add Location to StorageVolumeSource Signed-off-by: Thomas Hipp --- shared/api/storage_pool_volume.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/shared/api/storage_pool_volume.go b/shared/api/storage_pool_volume.go index 51a62547b532..bc2b0d3fcd6e 100644 --- a/shared/api/storage_pool_volume.go +++ b/shared/api/storage_pool_volume.go @@ -231,6 +231,12 @@ type StorageVolumeSource struct { // // API extension: storage_api_project Project string `json:"project,omitempty" yaml:"project,omitempty"` + + // What cluster member this record was found on + // Example: lxd01 + // + // API extension: cluster_internal_custom_volume_copy + Location string `json:"location" yaml:"location"` } // Writable converts a full StorageVolume struct into a StorageVolumePut struct (filters read-only fields). From 2672e88a5158a3d715bfdb85386e7ccf62da7d48 Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Tue, 10 Oct 2023 14:54:34 +0200 Subject: [PATCH 3/8] shared/api: Add Source to StorageVolumePost Signed-off-by: Thomas Hipp --- shared/api/storage_pool_volume.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/shared/api/storage_pool_volume.go b/shared/api/storage_pool_volume.go index bc2b0d3fcd6e..d7975aaafc3d 100644 --- a/shared/api/storage_pool_volume.go +++ b/shared/api/storage_pool_volume.go @@ -70,6 +70,11 @@ type StorageVolumePost struct { // // API extension: storage_volume_project_move Project string `json:"project,omitempty" yaml:"project,omitempty"` + + // Migration source + // + // API extension: cluster_internal_custom_volume_copy + Source StorageVolumeSource `json:"source" yaml:"source"` } // StorageVolumePostTarget represents the migration target host and operation From 5ffa78bbc76b460b18f3f2e73473afb0f37ba28a Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Fri, 13 Oct 2023 16:53:11 +0200 Subject: [PATCH 4/8] lxd/db: Add function to update storage volume node Signed-off-by: Thomas Hipp --- lxd/db/storage_volumes.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/lxd/db/storage_volumes.go b/lxd/db/storage_volumes.go index 73e84b0793ef..1fe3942ffddd 100644 --- a/lxd/db/storage_volumes.go +++ b/lxd/db/storage_volumes.go @@ -953,3 +953,34 @@ func (c *ClusterTx) GetStorageVolumeURIs(ctx context.Context, project string) ([ return uris, nil } + +// UpdateStorageVolumeNode changes the name of a storage volume and the cluster member hosting it. +// It's meant to be used when moving a storage volume backed by ceph from one cluster node to another. +func (c *ClusterTx) UpdateStorageVolumeNode(ctx context.Context, projectName string, oldName string, newName string, newMemberName string, poolID int64, volumeType int) error { + volume, err := c.GetStoragePoolVolume(ctx, poolID, projectName, volumeType, oldName, false) + if err != nil { + return err + } + + member, err := c.GetNodeByName(ctx, newMemberName) + if err != nil { + return fmt.Errorf("Failed to get new member %q info: %w", newMemberName, err) + } + + stmt := "UPDATE storage_volumes SET node_id=?, name=? WHERE id=?" + result, err := c.tx.Exec(stmt, member.ID, newName, volume.ID) + if err != nil { + return fmt.Errorf("Failed to update volumes's name and member ID: %w", err) + } + + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("Failed to get rows affected by volume update: %w", err) + } + + if n != 1 { + return fmt.Errorf("Unexpected number of updated rows in storage_volumes table: %d", n) + } + + return nil +} From d26048d019ec4b1c30dac64ad65481f42848468b Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Tue, 10 Oct 2023 15:30:44 +0200 Subject: [PATCH 5/8] lxd: Handle copying storage volumes with a single API call Signed-off-by: Thomas Hipp --- lxd/storage_volumes.go | 85 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 3 deletions(-) diff --git a/lxd/storage_volumes.go b/lxd/storage_volumes.go index 029d05233d91..0b157bd61d28 100644 --- a/lxd/storage_volumes.go +++ b/lxd/storage_volumes.go @@ -623,6 +623,39 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) response.Response { return response.Conflict(fmt.Errorf("Volume by that name already exists")) } + target := queryParam(r, "target") + + // Check if we need to switch to migration + clustered, err := lxdCluster.Enabled(s.DB.Node) + if err != nil { + return response.SmartError(err) + } + + serverName := s.ServerName + var nodeAddress string + + if clustered && target != "" && !req.Source.Refresh && (req.Source.Location != "" && serverName != req.Source.Location) { + err := s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + nodeInfo, err := tx.GetNodeByName(ctx, req.Source.Location) + if err != nil { + return err + } + + nodeAddress = nodeInfo.Address + + return nil + }) + if err != nil { + return response.SmartError(err) + } + + if nodeAddress == "" { + return response.BadRequest(fmt.Errorf("The source is currently offline")) + } + + return clusterCopyCustomVolumeInternal(s, r, nodeAddress, projectName, poolName, &req) + } + switch req.Source.Type { case "": return doVolumeCreateOrCopy(s, r, projectParam(r), projectName, poolName, &req) @@ -639,6 +672,49 @@ func storagePoolVolumesTypePost(d *Daemon, r *http.Request) response.Response { } } +func clusterCopyCustomVolumeInternal(s *state.State, r *http.Request, sourceAddress string, projectName string, poolName string, req *api.StorageVolumesPost) response.Response { + websockets := map[string]string{} + + client, err := lxdCluster.Connect(sourceAddress, s.Endpoints.NetworkCert(), s.ServerCert(), r, false) + if err != nil { + return response.SmartError(err) + } + + client = client.UseProject(req.Source.Project) + + pullReq := api.StorageVolumePost{ + Name: req.Source.Name, + Pool: req.Source.Pool, + Migration: true, + VolumeOnly: req.Source.VolumeOnly, + Project: req.Source.Project, + Source: api.StorageVolumeSource{ + Location: req.Source.Location, + }, + } + + op, err := client.MigrateStoragePoolVolume(req.Source.Pool, pullReq) + if err != nil { + return response.SmartError(err) + } + + opAPI := op.Get() + + for k, v := range opAPI.Metadata { + websockets[k] = v.(string) + } + + // Reset the source for a migration + req.Source.Type = "migration" + req.Source.Certificate = string(s.Endpoints.NetworkCert().PublicKey()) + req.Source.Mode = "pull" + req.Source.Operation = fmt.Sprintf("https://%s/%s/operations/%s", sourceAddress, version.APIVersion, opAPI.ID) + req.Source.Websockets = websockets + req.Source.Project = "" + + return doVolumeMigration(s, r, req.Source.Project, projectName, poolName, req) +} + func doCustomVolumeRefresh(s *state.State, r *http.Request, requestProjectName string, projectName string, poolName string, req *api.StorageVolumesPost) response.Response { var run func(op *operations.Operation) error @@ -1067,9 +1143,12 @@ func storagePoolVolumePost(d *Daemon, r *http.Request) response.Response { return response.BadRequest(err) } - resp = forwardedResponseIfVolumeIsRemote(s, r, srcPoolName, projectName, volumeName, volumeType) - if resp != nil { - return resp + // If source is set, there's no need to forward the request to a specific cluster member. + if req.Source.Location == "" { + resp = forwardedResponseIfVolumeIsRemote(s, r, srcPoolName, projectName, volumeName, volumeType) + if resp != nil { + return resp + } } // This is a migration request so send back requested secrets. From c20244c8c75d53b18d8f1032bb4b9eaa643faeac Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Fri, 13 Oct 2023 20:14:17 +0200 Subject: [PATCH 6/8] lxd: Support single API custom volume rename Signed-off-by: Thomas Hipp --- lxd/storage_volumes.go | 278 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 277 insertions(+), 1 deletion(-) diff --git a/lxd/storage_volumes.go b/lxd/storage_volumes.go index 0b157bd61d28..a0a2ed76e09a 100644 --- a/lxd/storage_volumes.go +++ b/lxd/storage_volumes.go @@ -24,6 +24,7 @@ import ( lxdCluster "github.com/canonical/lxd/lxd/cluster" "github.com/canonical/lxd/lxd/db" "github.com/canonical/lxd/lxd/db/cluster" + dbCluster "github.com/canonical/lxd/lxd/db/cluster" "github.com/canonical/lxd/lxd/db/operationtype" "github.com/canonical/lxd/lxd/instance" "github.com/canonical/lxd/lxd/operations" @@ -1132,6 +1133,148 @@ func storagePoolVolumePost(d *Daemon, r *http.Request) response.Response { r.Body = shared.BytesReadCloser{Buf: &buf} + target := queryParam(r, "target") + + // Check if clustered. + clustered, err := lxdCluster.Enabled(s.DB.Node) + if err != nil { + return response.InternalError(fmt.Errorf("Failed checking cluster state: %w", err)) + } + + if clustered && target != "" && req.Source.Location != "" && req.Migration { + var sourceNodeOffline bool + + err = s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + // Load source node. + nodeInfo, err := tx.GetNodeByName(ctx, req.Source.Location) + if err != nil { + return err + } + + sourceAddress := nodeInfo.Address + + if sourceAddress == "" { + // Local node. + sourceNodeOffline = false + return nil + } + + sourceMemberInfo, err := tx.GetNodeByAddress(ctx, sourceAddress) + if err != nil { + return fmt.Errorf("Failed to get source member for %q: %w", sourceAddress, err) + } + + sourceNodeOffline = sourceMemberInfo.IsOffline(s.GlobalConfig.OfflineThreshold()) + + return nil + }) + if err != nil { + return response.SmartError(err) + } + + var targetProject *api.Project + var targetMemberInfo *db.NodeInfo + + if sourceNodeOffline { + resp := forwardedResponseIfTargetIsRemote(s, r) + if resp != nil { + return resp + } + + srcPool, err := storagePools.LoadByName(s, srcPoolName) + if err != nil { + return response.SmartError(err) + } + + if srcPool.Driver().Info().Name == "ceph" { + // Load source volume. + srcPoolID, err := s.DB.Cluster.GetStoragePoolID(srcPoolName) + if err != nil { + return response.SmartError(err) + } + + var dbVolume *db.StorageVolume + + err = s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + dbVolume, err = tx.GetStoragePoolVolume(ctx, srcPoolID, projectName, db.StoragePoolVolumeTypeCustom, volumeName, true) + return err + }) + if err != nil { + // Check if the user provided an incorrect target query parameter and return a helpful error message. + _, volumeNotFound := api.StatusErrorMatch(err, http.StatusNotFound) + targetIsSet := r.URL.Query().Get("target") != "" + serverIsClustered, _ := lxdCluster.Enabled(s.DB.Node) + + if serverIsClustered && targetIsSet && volumeNotFound { + return response.NotFound(fmt.Errorf("Storage volume not found on this cluster member")) + } + + return response.SmartError(err) + } + + req := api.StorageVolumePost{ + Name: req.Name, + } + + return storagePoolVolumeTypePostRename(s, r, srcPool.Name(), projectName, &dbVolume.StorageVolume, req) + } + } else { + resp := forwardedResponseToNode(s, r, req.Source.Location) + if resp != nil { + return resp + } + } + + err = s.DB.Cluster.Transaction(r.Context(), func(ctx context.Context, tx *db.ClusterTx) error { + p, err := dbCluster.GetProject(ctx, tx.Tx(), projectName) + if err != nil { + return err + } + + targetProject, err = p.ToAPI(ctx, tx.Tx()) + if err != nil { + return err + } + + allMembers, err := tx.GetNodes(ctx) + if err != nil { + return fmt.Errorf("Failed getting cluster members: %w", err) + } + + targetMemberInfo, _, err = project.CheckTarget(ctx, s.Authorizer, r, tx, targetProject, target, allMembers) + if err != nil { + return err + } + + if targetMemberInfo == nil { + return fmt.Errorf("Failed checking cluster member %q", target) + } + + return nil + }) + if err != nil { + return response.SmartError(err) + } + + if targetMemberInfo.IsOffline(s.GlobalConfig.OfflineThreshold()) { + return response.BadRequest(fmt.Errorf("Target cluster member is offline")) + } + + run := func(op *operations.Operation) error { + return migrateStorageVolume(s, r, volumeName, srcPoolName, targetMemberInfo.Name, targetProjectName, req, op) + } + + resources := map[string][]api.URL{} + resources["storage_volumes"] = []api.URL{*api.NewURL().Path(version.APIVersion, "storage-pools", srcPoolName, "volumes", "custom", volumeName)} + + op, err := operations.OperationCreate(s, projectName, operations.OperationClassTask, operationtype.VolumeMigrate, resources, nil, run, nil, nil, r) + if err != nil { + return response.InternalError(err) + } + + return operations.OperationResponse(op) + } + resp := forwardedResponseIfTargetIsRemote(s, r) if resp != nil { return resp @@ -1143,7 +1286,7 @@ func storagePoolVolumePost(d *Daemon, r *http.Request) response.Response { return response.BadRequest(err) } - // If source is set, there's no need to forward the request to a specific cluster member. + // If source is set, we know the source and the target, and therefore don't need this function to figure out where to forward the request to. if req.Source.Location == "" { resp = forwardedResponseIfVolumeIsRemote(s, r, srcPoolName, projectName, volumeName, volumeType) if resp != nil { @@ -1238,6 +1381,139 @@ func storagePoolVolumePost(d *Daemon, r *http.Request) response.Response { return storagePoolVolumeTypePostMove(s, r, srcPoolName, projectName, targetProjectName, &dbVolume.StorageVolume, req) } +func migrateStorageVolume(s *state.State, r *http.Request, sourceVolumeName string, sourcePoolName string, targetNode string, projectName string, req api.StorageVolumePost, op *operations.Operation) error { + if targetNode == req.Source.Location { + return fmt.Errorf("Target must be different than storage volumes' current location") + } + + var err error + var srcMember, newMember db.NodeInfo + + // If the source member is online then get its address so we can connect to it and see if the + // instance is running later. + err = s.DB.Cluster.Transaction(s.ShutdownCtx, func(ctx context.Context, tx *db.ClusterTx) error { + srcMember, err = tx.GetNodeByName(ctx, req.Source.Location) + if err != nil { + return fmt.Errorf("Failed getting current cluster member of storage volume %q", req.Source.Name) + } + + newMember, err = tx.GetNodeByName(ctx, targetNode) + if err != nil { + return fmt.Errorf("Failed loading new cluster member for storage volume: %w", err) + } + + return nil + }) + if err != nil { + return err + } + + srcPool, err := storagePools.LoadByName(s, sourcePoolName) + if err != nil { + return fmt.Errorf("Failed loading storage volume storage pool: %w", err) + } + + f, err := storageVolumePostClusteringMigrate(s, r, srcPool, projectName, sourceVolumeName, req.Pool, req.Project, req.Name, srcMember, newMember, req.VolumeOnly) + if err != nil { + return err + } + + return f(op) +} + +func storageVolumePostClusteringMigrate(s *state.State, r *http.Request, srcPool storagePools.Pool, srcProjectName string, srcVolumeName string, newPoolName string, newProjectName string, newVolumeName string, srcMember db.NodeInfo, newMember db.NodeInfo, volumeOnly bool) (func(op *operations.Operation) error, error) { + srcMemberOffline := srcMember.IsOffline(s.GlobalConfig.OfflineThreshold()) + + // Make sure that the source member is online if we end up being called from another member after a + // redirection due to the source member being offline. + if srcMemberOffline { + return nil, fmt.Errorf("The cluster member hosting the storage volume is offline") + } + + run := func(op *operations.Operation) error { + if newVolumeName == "" { + newVolumeName = srcVolumeName + } + + networkCert := s.Endpoints.NetworkCert() + + // Connect to the destination member, i.e. the member to migrate the custom volume to. + // Use the notify argument to indicate to the destination that we are moving a custom volume between + // cluster members. + dest, err := lxdCluster.Connect(newMember.Address, networkCert, s.ServerCert(), r, true) + if err != nil { + return fmt.Errorf("Failed to connect to destination server %q: %w", newMember.Address, err) + } + + dest = dest.UseTarget(newMember.Name).UseProject(srcProjectName) + + resources := map[string][]api.URL{} + resources["storage_volumes"] = []api.URL{*api.NewURL().Path(version.APIVersion, "storage-pools", srcPool.Name(), "volumes", "custom", srcVolumeName)} + + srcMigration, err := newStorageMigrationSource(volumeOnly, nil) + if err != nil { + return fmt.Errorf("Failed setting up storage volume migration on source: %w", err) + } + + run := func(op *operations.Operation) error { + err := srcMigration.DoStorage(s, srcProjectName, srcPool.Name(), srcVolumeName, op) + if err != nil { + return err + } + + err = srcPool.DeleteCustomVolume(srcProjectName, srcVolumeName, op) + if err != nil { + return err + } + + return nil + } + + cancel := func(op *operations.Operation) error { + srcMigration.disconnect() + return nil + } + + srcOp, err := operations.OperationCreate(s, srcProjectName, operations.OperationClassWebsocket, operationtype.VolumeMigrate, resources, srcMigration.Metadata(), run, cancel, srcMigration.Connect, r) + if err != nil { + return err + } + + err = srcOp.Start() + if err != nil { + return fmt.Errorf("Failed starting migration source operation: %w", err) + } + + sourceSecrets := make(map[string]string, len(srcMigration.conns)) + for connName, conn := range srcMigration.conns { + sourceSecrets[connName] = conn.Secret() + } + + // Request pull mode migration on destination. + err = dest.CreateStoragePoolVolume(newPoolName, api.StorageVolumesPost{ + Name: newVolumeName, + Type: "custom", + Source: api.StorageVolumeSource{ + Type: "migration", + Mode: "pull", + Operation: fmt.Sprintf("https://%s%s", srcMember.Address, srcOp.URL()), + Websockets: sourceSecrets, + Certificate: string(networkCert.PublicKey()), + Name: newVolumeName, + Pool: newPoolName, + Project: newProjectName, + }, + }) + if err != nil { + return fmt.Errorf("Failed requesting instance create on destination: %w", err) + } + + return nil + } + + return run, nil +} + // storagePoolVolumeTypePostMigration handles volume migration type POST requests. func storagePoolVolumeTypePostMigration(state *state.State, r *http.Request, requestProjectName string, projectName string, poolName string, volumeName string, req api.StorageVolumePost) response.Response { ws, err := newStorageMigrationSource(req.VolumeOnly, req.Target) From 0b163440b40f87f6aa61f785ed6f3fc4937519d1 Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Tue, 10 Oct 2023 15:38:33 +0200 Subject: [PATCH 7/8] client: Set Source.Location if supported Signed-off-by: Thomas Hipp --- client/lxd_storage_volumes.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/lxd_storage_volumes.go b/client/lxd_storage_volumes.go index a12f6fbbc532..5367358ba388 100644 --- a/client/lxd_storage_volumes.go +++ b/client/lxd_storage_volumes.go @@ -555,8 +555,10 @@ func (r *ProtocolLXD) CopyStoragePoolVolume(pool string, source InstanceServer, return nil, fmt.Errorf("Failed to get destination connection info: %w", err) } + clusterInternalVolumeCopy := r.CheckExtension("cluster_internal_custom_volume_copy") == nil + // Copy the storage pool volume locally. - if destInfo.URL == sourceInfo.URL && destInfo.SocketPath == sourceInfo.SocketPath && (volume.Location == r.clusterTarget || (volume.Location == "none" && r.clusterTarget == "")) { + if destInfo.URL == sourceInfo.URL && destInfo.SocketPath == sourceInfo.SocketPath && (volume.Location == r.clusterTarget || (volume.Location == "none" && r.clusterTarget == "") || clusterInternalVolumeCopy) { // Project handling if destInfo.Project != sourceInfo.Project { if !r.HasExtension("storage_api_project") { @@ -566,6 +568,10 @@ func (r *ProtocolLXD) CopyStoragePoolVolume(pool string, source InstanceServer, req.Source.Project = sourceInfo.Project } + if clusterInternalVolumeCopy { + req.Source.Location = sourceInfo.Target + } + // Send the request op, _, err := r.queryOperation("POST", fmt.Sprintf("/storage-pools/%s/volumes/%s", url.PathEscape(pool), url.PathEscape(volume.Type)), req, "", true) if err != nil { From b7e758a88ff170df9c801ea78ffe4a65d8c700b2 Mon Sep 17 00:00:00 2001 From: Thomas Hipp Date: Mon, 16 Oct 2023 09:30:36 +0200 Subject: [PATCH 8/8] doc: Update API Signed-off-by: Thomas Hipp --- doc/rest-api.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/doc/rest-api.yaml b/doc/rest-api.yaml index 27fa9695d81c..56b8f697d8c4 100644 --- a/doc/rest-api.yaml +++ b/doc/rest-api.yaml @@ -5827,6 +5827,8 @@ definitions: example: foo type: string x-go-name: Project + source: + $ref: '#/definitions/StorageVolumeSource' target: $ref: '#/definitions/StorageVolumePostTarget' volume_only: @@ -5981,6 +5983,11 @@ definitions: example: X509 PEM certificate type: string x-go-name: Certificate + location: + description: What cluster member this record was found on + example: lxd01 + type: string + x-go-name: Location mode: description: Whether to use pull or push mode (for migration) example: pull