diff --git a/Makefile b/Makefile index 24116fbb0c7..57024dc20f9 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,8 @@ endif GO_PROJECT=github.com/ceph/ceph-csi CEPH_VERSION ?= $(shell . $(CURDIR)/build.env ; echo $${CEPH_VERSION}) -GO_TAGS_LIST ?= $(CEPH_VERSION) +# TODO: ceph_preview tag required for FSQuiesce API +GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview # go build flags LDFLAGS ?= diff --git a/go.mod b/go.mod index b5e20db05e3..ac06b7d4426 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ toolchain go1.21.5 require ( github.com/IBM/keyprotect-go-client v0.12.2 - github.com/aws/aws-sdk-go v1.50.26 + github.com/aws/aws-sdk-go v1.50.32 github.com/aws/aws-sdk-go-v2/service/sts v1.28.1 github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000 - github.com/ceph/go-ceph v0.26.0 + github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243 github.com/container-storage-interface/spec v1.9.0 github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444 github.com/gemalto/kmip-go v0.0.10 diff --git a/go.sum b/go.sum index 14e0f5f640f..cbf0c5133f2 100644 --- a/go.sum +++ b/go.sum @@ -833,8 +833,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.44.164/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/aws/aws-sdk-go v1.50.26 h1:tuv8+dje59DBK1Pj65tSCdD36oamBxKYJgbng4bFylc= -github.com/aws/aws-sdk-go v1.50.26/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.50.32 h1:POt81DvegnpQKM4DMDLlHz1CO6OBnEoQ1gRhYFd7QRY= +github.com/aws/aws-sdk-go v1.50.32/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w= github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8= @@ -870,8 +870,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= -github.com/ceph/go-ceph v0.26.0 h1:LZoATo25ZH5aeL5t85BwIbrNLKCDfcDM+e0qV0cmwHY= -github.com/ceph/go-ceph v0.26.0/go.mod h1:ISxb295GszZwtLPkeWi+L2uLYBVsqbsh0M104jZMOX4= +github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243 h1:O99PJ2rNxY+XiN2swRSmJC24V3YInVt5Lk48Em1cdVE= +github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243/go.mod h1:PS15ql+uqcnZN8uD3WuxlImxdaTYtxqJoaTmlFJYnbI= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index dac4bad3b9c..e6de82ea541 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -56,6 +56,10 @@ type ControllerServer struct { // A map storing all volumes/snapshots with ongoing operations. OperationLocks *util.OperationLock + // A map storing all volumes with ongoing operations so that additional operations + // for that same volume (as defined by volumegroup ID/volumegroup name) return an Aborted error + VolumeGroupLocks *util.VolumeLocks + // Cluster name ClusterName string diff --git a/internal/cephfs/core/quiesce.go b/internal/cephfs/core/quiesce.go new file mode 100644 index 00000000000..d867e3ef9f3 --- /dev/null +++ b/internal/cephfs/core/quiesce.go @@ -0,0 +1,250 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "context" + + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/ceph/go-ceph/cephfs/admin" +) + +type QuiesceState string + +const ( + Released QuiesceState = "RELEASED" + Quiescing QuiesceState = "QUIESCING" + Quiesced QuiesceState = "QUIESCED" +) + +// GetQuiesceState returns the quiesce state of the filesystem. +func GetQuiesceState(set admin.QuiesceState) QuiesceState { + var state QuiesceState + switch set.Name { + case "RELEASED": + state = Released + case "QUIESCING": + state = Quiescing + case "QUIESCED": + state = Quiesced + default: + state = QuiesceState(set.Name) + } + + return state +} + +type FSQuiesceClient interface { + // Destroy destroys the connection used for FSAdmin. + Destroy() + // FSQuiesce quiesces the subvolumes in the filesystem. + FSQuiesce( + ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) + // GetVolumes returns the list of volumes in the filesystem that are to be + // quiesced. + GetVolumes() []Volume + // FSQuiesceWithExpireTimeout quiesces the subvolumes in the filesystem + // with an expiration timeout. it should be used after FSQuiesce to reset + // the expire timeout. This helps in keeping the subvolumes in the + // filesystem in quiesced state until all snapshots are taken. + FSQuiesceWithExpireTimeout(ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) + // ResetFSQuiesce resets the quiesce timeout for the subvolumes in + // the filesystem. + ResetFSQuiesce(ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) + // ReleaseFSQuiesce releases the quiesce on the subvolumes in the + // filesystem. + ReleaseFSQuiesce(ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) +} + +type Volume struct { + VolumeID string + ClusterID string +} + +type fsQuiesce struct { + connection *util.ClusterConnection + fsName string + volumes []Volume + // subVolumeGroupMapping is a map of subvolumes to groups. + subVolumeGroupMapping map[string][]string + fsa *admin.FSAdmin +} + +// NewFSQuiesce returns a new instance of fsQuiesce. It +// take the filesystem name, the list of volumes to be quiesced, the mapping of +// subvolumes to groups and the cluster connection as input. +func NewFSQuiesce( + fsName string, + volumes []Volume, + mapping map[string][]string, + conn *util.ClusterConnection, +) (FSQuiesceClient, error) { + fsa, err := conn.GetFSAdmin() + if err != nil { + return nil, err + } + + return &fsQuiesce{ + connection: conn, + fsName: fsName, + volumes: volumes, + subVolumeGroupMapping: mapping, + fsa: fsa, + }, nil +} + +// Destroy destroys the connection used for FSAdmin. +func (fq *fsQuiesce) Destroy() { + if fq.connection != nil { + fq.connection.Destroy() + } +} + +// GetVolumes returns the list of volumes in the filesystem that are to be +// quiesced. +func (fq *fsQuiesce) GetVolumes() []Volume { + return fq.volumes +} + +// getMembers returns the list of names in the format +// group/subvolume that are to be quiesced. This is the format that the +// ceph fs quiesce expects. +// Example: ["group1/subvolume1", "group1/subvolume2", "group2/subvolume1"]. +func (fq *fsQuiesce) getMembers() []string { + volName := []string{} + for svg, sb := range fq.subVolumeGroupMapping { + for _, s := range sb { + name := svg + "/" + s + volName = append(volName, name) + } + } + + return volName +} + +func (fq *fsQuiesce) FSQuiesce( + ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + Timeout: 180, + AwaitFor: 0, + Expiration: 180, + } + log.DebugLog(ctx, + "FSQuiesce for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to quiesce filesystem %s", err) + + return nil, err +} + +func (fq *fsQuiesce) FSQuiesceWithExpireTimeout(ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + Timeout: 180, + AwaitFor: 0, + Expiration: 180, + } + log.DebugLog(ctx, + "FSQuiesceWithExpireTimeout for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to quiesce filesystem with expire timeout %s", err) + + return nil, err +} + +func (fq *fsQuiesce) ResetFSQuiesce(ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + Reset: true, + AwaitFor: 0, + Timeout: 180, + Expiration: 180, + } + // Reset the filesystem quiesce so that the timer will be reset, and we can + // reuse the same reservation if it has already failed or timed out. + log.DebugLog(ctx, + "ResetFSQuiesce for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to reset timeout for quiesce filesystem %s", err) + + return nil, err +} + +func (fq *fsQuiesce) ReleaseFSQuiesce(ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + AwaitFor: 0, + Release: true, + } + log.DebugLog(ctx, + "ReleaseFSQuiesce for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, []string{}, reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to release quiesce of filesystem %s", err) + + return nil, err +} diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 9f1957fd6a1..dd78cf5c801 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -67,6 +67,7 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { DefaultControllerServer: csicommon.NewDefaultControllerServer(d), VolumeLocks: util.NewVolumeLocks(), SnapshotLocks: util.NewVolumeLocks(), + VolumeGroupLocks: util.NewVolumeLocks(), OperationLocks: util.NewOperationLock(), } } @@ -124,6 +125,10 @@ func (fs *Driver) Run(conf *util.Config) { store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) store.SnapJournal = journal.NewCSISnapshotJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) + + store.VolumeGroupJournal = journal.NewCSIVolumeGroupJournalWithNamespace( + CSIInstanceID, + fsutil.RadosNamespace) // Initialize default library driver fs.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID) @@ -146,6 +151,10 @@ func (fs *Driver) Run(conf *util.Config) { csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, }) + + fs.cd.AddGroupControllerServiceCapabilities([]csi.GroupControllerServiceCapability_RPC_Type{ + csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT, + }) } // Create gRPC servers @@ -192,6 +201,7 @@ func (fs *Driver) Run(conf *util.Config) { IS: fs.is, CS: fs.cs, NS: fs.ns, + GS: fs.cs, } server.Start(conf.Endpoint, srv) diff --git a/internal/cephfs/errors/errors.go b/internal/cephfs/errors/errors.go index 790119d9a19..a354aa57efa 100644 --- a/internal/cephfs/errors/errors.go +++ b/internal/cephfs/errors/errors.go @@ -58,6 +58,9 @@ var ( // ErrVolumeHasSnapshots is returned when a subvolume has snapshots. ErrVolumeHasSnapshots = coreError.New("volume has snapshots") + + // ErrQuiesceInProgress is returned when quiesce operation is in progress. + ErrQuiesceInProgress = coreError.New("quiesce operation is in progress") ) // IsCloneRetryError returns true if the clone error is pending,in-progress diff --git a/internal/cephfs/groupcontrollerserver.go b/internal/cephfs/groupcontrollerserver.go new file mode 100644 index 00000000000..42ab7aec11d --- /dev/null +++ b/internal/cephfs/groupcontrollerserver.go @@ -0,0 +1,779 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "context" + "errors" + "fmt" + "sort" + "time" + + "github.com/ceph/ceph-csi/internal/cephfs/core" + cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" + "github.com/ceph/ceph-csi/internal/cephfs/store" + fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/ceph/go-ceph/cephfs/admin" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + "k8s.io/utils/strings/slices" +) + +// validateCreateVolumeGroupSnapshotRequest validates the request for creating +// a group snapshot of volumes. +func (cs *ControllerServer) validateCreateVolumeGroupSnapshotRequest( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, +) error { + if err := cs.Driver.ValidateGroupControllerServiceRequest( + csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil { + log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req)) + + return err + } + + // Check sanity of request volume group snapshot Name, Source Volume Id's + if req.GetName() == "" { + return status.Error(codes.InvalidArgument, "volume group snapshot Name cannot be empty") + } + + if len(req.GetSourceVolumeIds()) == 0 { + return status.Error(codes.InvalidArgument, "source volume ids cannot be empty") + } + + param := req.GetParameters() + // check for ClusterID and fsName + if value, ok := param["clusterID"]; !ok || value == "" { + return status.Error(codes.InvalidArgument, "missing or empty clusterID") + } + + if value, ok := param["fsName"]; !ok || value == "" { + return status.Error(codes.InvalidArgument, "missing or empty fsName") + } + + return nil +} + +// CreateVolumeGroupSnapshot creates a group snapshot of volumes. +func (cs *ControllerServer) CreateVolumeGroupSnapshot( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest) ( + *csi.CreateVolumeGroupSnapshotResponse, + error, +) { + if err := cs.validateCreateVolumeGroupSnapshotRequest(ctx, req); err != nil { + return nil, err + } + + requestName := req.GetName() + // Existence and conflict checks + if acquired := cs.VolumeGroupLocks.TryAcquire(requestName); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, requestName) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName) + } + defer cs.VolumeGroupLocks.Release(requestName) + + cr, err := util.NewAdminCredentials(req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + vg, err := store.NewVolumeGroupOptions(ctx, req, cr) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + defer vg.Destroy() + + vgs, err := store.CheckVolumeGroupSnapExists(ctx, vg, cr) + if err != nil { + log.ErrorLog(ctx, "failed to check volume group snapshot exists: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + + // Get the fs names and subvolume from the volume ids to execute quiesce commands. + fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), req.GetSourceVolumeIds(), cr) + if err != nil { + log.ErrorLog(ctx, "failed to get fs names and subvolume from volume ids: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + defer destroyFSConnections(fsMap) + + needRelease := checkIfFSNeedQuiesceRelease(vgs, req.GetSourceVolumeIds()) + if needRelease { + return cs.releaseQuiesceAndGetVolumeGroupSnapshotResponse(ctx, req, vgs, fsMap, vg, cr) + } + + // If the volume group snapshot does not exist, reserve the volume group + if vgs == nil { + vgs, err = store.ReserveVolumeGroup(ctx, vg, cr) + if err != nil { + log.ErrorLog(ctx, "failed to reserve volume group: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + } + + inProgress, err := cs.queisceFileSystems(ctx, vgs, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystems: %v", err) + if !errors.Is(err, cerrors.ErrQuiesceInProgress) { + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + } + + return nil, status.Error(codes.Internal, err.Error()) + } + + if inProgress { + return nil, status.Error(codes.Internal, "Quiesce operation is in progress") + } + + resp, err := cs.createSnapshotAddToVolumeGroupJournal(ctx, req, vg, vgs, cr, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to create snapshot and add to volume group journal: %v", err) + + if !errors.Is(err, cerrors.ErrQuiesceInProgress) { + // Handle Undo reservation and timeout as well + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + } + + return nil, status.Error(codes.Internal, err.Error()) + } + + response := &csi.CreateVolumeGroupSnapshotResponse{} + response.GroupSnapshot = &csi.VolumeGroupSnapshot{ + GroupSnapshotId: vgs.VolumeGroupSnapshotID, + ReadyToUse: true, + CreationTime: timestamppb.New(time.Now()), + } + + for _, r := range *resp { + r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID + response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot) + } + + return response, nil +} + +// queisceFileSystems quiesces the subvolumes and subvolume groups present in +// the filesystems of the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func (cs *ControllerServer) queisceFileSystems(ctx context.Context, + vgs *store.VolumeGroupSnapshotIdentifier, + fsMap map[string]core.FSQuiesceClient, +) (bool, error) { + var inProgress bool + for _, fm := range fsMap { + // Quiesce the fs, subvolumes and subvolume groups + data, err := fm.FSQuiesce(ctx, vgs.RequestName) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystem: %v", err) + + return inProgress, err + } + state := core.GetQuiesceState(data.State) + if state == core.Quiescing { + inProgress = true + } else if state != core.Quiesced { + return inProgress, fmt.Errorf("quiesce operation is in %s state", state) + } + } + + return inProgress, nil +} + +// releaseQuiesceAndGetVolumeGroupSnapshotResponse releases the quiesce of the +// subvolumes and subvolume groups in the filesystems for the volumeID's +// present in the CreateVolumeGroupSnapshotRequest. +func (cs *ControllerServer) releaseQuiesceAndGetVolumeGroupSnapshotResponse( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, + vgs *store.VolumeGroupSnapshotIdentifier, + fsMap map[string]core.FSQuiesceClient, + vg *store.VolumeGroupOptions, + cr *util.Credentials, +) (*csi.CreateVolumeGroupSnapshotResponse, error) { + matchesSourceVolumeIDs := matchesSourceVolumeIDs(vgs.GetVolumeIDs(), req.GetSourceVolumeIds()) + if !matchesSourceVolumeIDs { + return nil, status.Errorf( + codes.InvalidArgument, + "source volume ids %v do not match in the existing volume group snapshot %v", + req.GetSourceVolumeIds(), + vgs.GetVolumeIDs()) + } + // Release the quiesce of the subvolumes and subvolume groups in the + // filesystems for the volumes. + for _, fm := range fsMap { + // UnFreeze the filesystems, subvolumes and subvolume groups + data, err := fm.ReleaseFSQuiesce(ctx, vg.RequestName) + if err != nil { + log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err) + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + + return nil, status.Errorf(codes.Internal, "failed to release filesystem quiesce: %v", err) + } + state := core.GetQuiesceState(data.State) + if state != core.Released { + return nil, status.Errorf(codes.Internal, "quiesce operation is in %s state", state) + } + } + var err error + defer func() { + if err != nil && !errors.Is(err, cerrors.ErrQuiesceInProgress) { + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + } + }() + snapshotResponses := make([]csi.CreateSnapshotResponse, 0) + for _, volID := range req.GetSourceVolumeIds() { + // Create the snapshot for the volumeID + clusterID := getClusterIDForVolumeID(fsMap, volID) + if clusterID == "" { + return nil, status.Errorf(codes.Internal, "failed to get clusterID for volumeID %s", volID) + } + + req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName, + clusterID, + req.GetSecrets()) + var resp *csi.CreateSnapshotResponse + resp, err = cs.createSnapshotAndAddMapping(ctx, req, vg, vgs, cr) + if err != nil { + // Handle cleanup + log.ErrorLog(ctx, "failed to create snapshot: %v", err) + + return nil, status.Errorf(codes.Internal, + "failed to create snapshot and add to volume group journal: %v", + err) + } + snapshotResponses = append(snapshotResponses, *resp) + } + + response := &csi.CreateVolumeGroupSnapshotResponse{} + response.GroupSnapshot = &csi.VolumeGroupSnapshot{ + GroupSnapshotId: vgs.VolumeGroupSnapshotID, + ReadyToUse: true, + CreationTime: timestamppb.New(time.Now()), + } + + for _, r := range snapshotResponses { + r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID + response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot) + } + + return response, nil +} + +// createSnapshotAddToVolumeGroupJournal creates the snapshot and adds the +// snapshotID and volumeID to the volume group journal omap. If the freeze is +// true then it will freeze the subvolumes and subvolume groups before creating +// the snapshot and unfreeze them after creating the snapshot. If the freeze is +// false it will call createSnapshot and get the snapshot details for the +// volume and add the snapshotID and volumeID to the volume group journal omap. +// If any error occurs other than ErrInProgress it will delete the snapshots +// and undo the reservation and return the error. +func (cs *ControllerServer) createSnapshotAddToVolumeGroupJournal( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, + vgo *store.VolumeGroupOptions, + vgs *store.VolumeGroupSnapshotIdentifier, + cr *util.Credentials, + fsMap map[string]core.FSQuiesceClient) ( + *[]csi.CreateSnapshotResponse, + error, +) { + var err error + var resp *csi.CreateSnapshotResponse + + responses := make([]csi.CreateSnapshotResponse, 0) + for _, volID := range req.GetSourceVolumeIds() { + err = fsQuiesceWithExpireTimeout(ctx, vgo.RequestName, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err) + + return nil, err + } + + // Create the snapshot for the volumeID + clusterID := getClusterIDForVolumeID(fsMap, volID) + if clusterID == "" { + return nil, fmt.Errorf("failed to get clusterID for volumeID %s", volID) + } + + req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName, + clusterID, + req.GetSecrets()) + resp, err = cs.createSnapshotAndAddMapping(ctx, req, vgo, vgs, cr) + if err != nil { + // Handle cleanup + log.ErrorLog(ctx, "failed to create snapshot: %v", err) + + return nil, err + } + responses = append(responses, *resp) + } + + err = releaseFSQuiesce(ctx, vgo.RequestName, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err) + + return nil, err + } + + return &responses, nil +} + +func formatCreateSnapshotRequest(volID, groupSnapshotName, + clusterID string, + secret map[string]string, +) *csi.CreateSnapshotRequest { + return &csi.CreateSnapshotRequest{ + SourceVolumeId: volID, + Name: groupSnapshotName + "-" + volID, + Secrets: secret, + Parameters: map[string]string{ + "clusterID": clusterID, + }, + } +} + +// releaseSubvolumeQuiesce releases the quiesce of the subvolumes and subvolume +// groups in the filesystems for the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func releaseFSQuiesce(ctx context.Context, + requestName string, + fsMap map[string]core.FSQuiesceClient, +) error { + inProgress := false + var err error + var data *admin.QuiesceInfo + for _, fm := range fsMap { + // UnFreeze the filesystems, subvolumes and subvolume groups + data, err = fm.ReleaseFSQuiesce(ctx, requestName) + if err != nil { + log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err) + + return err + } + state := core.GetQuiesceState(data.State) + if state != core.Released { + inProgress = true + } + } + + if inProgress { + return cerrors.ErrQuiesceInProgress + } + + return nil +} + +// fsQuiesceWithExpireTimeout quiesces the subvolumes and subvolume +// groups in the filesystems for the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func fsQuiesceWithExpireTimeout(ctx context.Context, + requestName string, + fsMap map[string]core.FSQuiesceClient, +) error { + var err error + + var data *admin.QuiesceInfo + inProgress := false + for _, fm := range fsMap { + // reinitialize the expiry timer for the quiesce + data, err = fm.FSQuiesceWithExpireTimeout(ctx, requestName) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err) + + return err + } + state := core.GetQuiesceState(data.State) + if state == core.Quiescing { + inProgress = true + } else if state != core.Quiesced { + return fmt.Errorf("quiesce operation is in %s state", state) + } + } + + if inProgress { + return cerrors.ErrQuiesceInProgress + } + + return nil +} + +// createSnapshotAndAddMapping creates the snapshot and adds the snapshotID and +// volumeID to the volume group journal omap. If any error occurs it will +// delete the last created snapshot as its still not added to the journal. +func (cs *ControllerServer) createSnapshotAndAddMapping( + ctx context.Context, + req *csi.CreateSnapshotRequest, + vgo *store.VolumeGroupOptions, + vgs *store.VolumeGroupSnapshotIdentifier, + cr *util.Credentials, +) (*csi.CreateSnapshotResponse, error) { + // Create the snapshot + resp, err := cs.CreateSnapshot(ctx, req) + if err != nil { + // Handle cleanup + log.ErrorLog(ctx, "failed to create snapshot: %v", err) + + return nil, err + } + j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, err + } + defer j.Destroy() + // Add the snapshot to the volume group journal + err = j.AddVolumeSnapshotMapping(ctx, + vgo.MetadataPool, + vgs.ReservedID, + req.GetSourceVolumeId(), + resp.GetSnapshot().GetSnapshotId()) + if err != nil { + log.ErrorLog(ctx, "failed to add volume snapshot mapping: %v", err) + // Delete the last created snapshot as its still not added to the + // journal + delReq := &csi.DeleteSnapshotRequest{ + SnapshotId: resp.GetSnapshot().GetSnapshotId(), + Secrets: req.GetSecrets(), + } + _, dErr := cs.DeleteSnapshot(ctx, delReq) + if dErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot %s: %v", resp.GetSnapshot().GetSnapshotId(), dErr) + } + + return nil, err + } + + return resp, nil +} + +// checkIfFSNeedQuiesceRelease checks that do we have snapshots for all the +// volumes stored in the omap so that we can release the quiesce. +func checkIfFSNeedQuiesceRelease(vgs *store.VolumeGroupSnapshotIdentifier, volIDs []string) bool { + if vgs == nil { + return false + } + // If the number of volumes in the snapshot is not equal to the number of volumes + + return len(vgs.GetVolumeIDs()) == len(volIDs) +} + +// getClusterIDForVolumeID gets the clusterID for the volumeID from the fms map. +func getClusterIDForVolumeID(fms map[string]core.FSQuiesceClient, volumeID string) string { + for _, fm := range fms { + for _, vol := range fm.GetVolumes() { + if vol.VolumeID == volumeID { + return vol.ClusterID + } + } + } + + return "" +} + +// getFsNamesAndSubVolumeFromVolumeIDs gets the filesystem names and subvolumes +// from the volumeIDs present in the CreateVolumeGroupSnapshotRequest. It also +// returns the SubVolumeQuiesceClient for the filesystems present in the +// volumeIDs. +func getFsNamesAndSubVolumeFromVolumeIDs(ctx context.Context, + secret map[string]string, + volIDs []string, + cr *util.Credentials) ( + map[string]core.FSQuiesceClient, + error, +) { + type fs struct { + fsName string + volumes []core.Volume + subVolumeGroupMapping map[string][]string + monitors string + } + fm := make(map[string]fs, 0) + for _, volID := range volIDs { + // Find the volume using the provided VolumeID + volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, + volID, nil, secret, "", false) + if err != nil { + return nil, err + } + volOptions.Destroy() + // choosing monitorIP's and fsName as the unique key + // TODO: Need to use something else as the unique key as users can + // still choose the different monitorIP's and fsName for subvolumes + uniqueName := volOptions.Monitors + volOptions.FsName + if _, ok := fm[uniqueName]; !ok { + fm[uniqueName] = fs{ + fsName: volOptions.FsName, + volumes: make([]core.Volume, 0), + subVolumeGroupMapping: make(map[string][]string), // Initialize the map + monitors: volOptions.Monitors, + } + } + a := core.Volume{ + VolumeID: volID, + ClusterID: volOptions.ClusterID, + } + // Retrieve the value, modify it, and assign it back + val := fm[uniqueName] + val.volumes = append(val.volumes, a) + existingVolIDInMap := val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup] + val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup] = append( + existingVolIDInMap, + volOptions.SubVolume.VolID) + fm[uniqueName] = val + } + fsk := map[string]core.FSQuiesceClient{} + var err error + defer func() { + if err != nil { + destroyFSConnections(fsk) + } + }() + for k, v := range fm { + conn := &util.ClusterConnection{} + if err = conn.Connect(v.monitors, cr); err != nil { + return nil, err + } + fsk[k], err = core.NewFSQuiesce(v.fsName, v.volumes, v.subVolumeGroupMapping, conn) + if err != nil { + log.ErrorLog(ctx, "failed to get subvolume quiesce: %v", err) + conn.Destroy() + + return nil, err + } + } + + return fsk, nil +} + +// destroyFSConnections destroys connections of all FSQuiesceClient. +func destroyFSConnections(fsMap map[string]core.FSQuiesceClient) { + for _, fm := range fsMap { + if fm != nil { + fm.Destroy() + } + } +} + +// matchesSourceVolumeIDs checks if the sourceVolumeIDs and volumeIDsInOMap are +// equal. +func matchesSourceVolumeIDs(sourceVolumeIDs, volumeIDsInOMap []string) bool { + // sort the array as its required for slices.Equal call. + sort.Strings(sourceVolumeIDs) + sort.Strings(volumeIDsInOMap) + + return slices.Equal(sourceVolumeIDs, volumeIDsInOMap) +} + +// deleteSnapshotsAndUndoReservation deletes the snapshots and undoes the +// volume group reservation. It also resets the quiesce of the subvolumes and +// subvolume groups in the filesystems for the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func (cs *ControllerServer) deleteSnapshotsAndUndoReservation(ctx context.Context, + vgs *store.VolumeGroupSnapshotIdentifier, + cr *util.Credentials, + fsMap map[string]core.FSQuiesceClient, + secrets map[string]string, +) error { + // get the omap from the snapshot and volume mapping + vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, vgs.VolumeGroupSnapshotID, cr) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options from id: %v", err) + + return err + } + defer vgo.Destroy() + + for volID, snapID := range vgsi.VolumeSnapshotMap { + // delete the snapshots + req := &csi.DeleteSnapshotRequest{ + SnapshotId: snapID, + Secrets: secrets, + } + _, err = cs.DeleteSnapshot(ctx, req) + if err != nil { + log.ErrorLog(ctx, "failed to delete snapshot: %v", err) + + return err + } + + j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return err + } + // remove the entry from the omap + err = j.RemoveVolumeSnapshotMapping( + ctx, + vgo.MetadataPool, + vgsi.ReservedID, + volID) + j.Destroy() + if err != nil { + log.ErrorLog(ctx, "failed to remove volume snapshot mapping: %v", err) + + return err + } + // undo the reservation + err = store.UndoVolumeGroupReservation(ctx, vgo, vgsi, cr) + if err != nil { + log.ErrorLog(ctx, "failed to undo volume group reservation: %v", err) + + return err + } + } + + for _, fm := range fsMap { + _, err := fm.ResetFSQuiesce(ctx, vgs.RequestName) + if err != nil { + log.ErrorLog(ctx, "failed to reset filesystem quiesce: %v", err) + + return err + } + } + + return nil +} + +// validateVolumeGroupSnapshotDeleteRequest validates the request for creating a group +// snapshot of volumes. +func (cs *ControllerServer) validateVolumeGroupSnapshotDeleteRequest( + ctx context.Context, + req *csi.DeleteVolumeGroupSnapshotRequest, +) error { + if err := cs.Driver.ValidateGroupControllerServiceRequest( + csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil { + log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req)) + + return err + } + + // Check sanity of request volume group snapshot Name, Source Volume Id's + if req.GetGroupSnapshotId() == "" { + return status.Error(codes.InvalidArgument, "volume group snapshot id cannot be empty") + } + + return nil +} + +// DeleteVolumeGroupSnapshot deletes a group snapshot of volumes. +func (cs *ControllerServer) DeleteVolumeGroupSnapshot(ctx context.Context, + req *csi.DeleteVolumeGroupSnapshotRequest) ( + *csi.DeleteVolumeGroupSnapshotResponse, + error, +) { + if err := cs.validateVolumeGroupSnapshotDeleteRequest(ctx, req); err != nil { + return nil, err + } + + groupSnapshotID := req.GroupSnapshotId + // Existence and conflict checks + if acquired := cs.VolumeGroupLocks.TryAcquire(groupSnapshotID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID) + } + defer cs.VolumeGroupLocks.Release(groupSnapshotID) + + cr, err := util.NewAdminCredentials(req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, req.GroupSnapshotId, cr) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options: %v", err) + err = extractDeleteVolumeGroupError(err) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil + } + vgo.Destroy() + + volIds := vgsi.GetVolumeIDs() + fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), volIds, cr) + err = extractDeleteVolumeGroupError(err) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options: %v", err) + err = extractDeleteVolumeGroupError(err) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil + } + + defer destroyFSConnections(fsMap) + + err = cs.deleteSnapshotsAndUndoReservation(ctx, vgsi, cr, fsMap, req.GetSecrets()) + if err != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", err) + err = extractDeleteVolumeGroupError(err) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil +} + +// extractDeleteVolumeGroupError extracts the error from the delete volume +// group snapshot and returns the error if it is not a ErrKeyNotFound or +// ErrPoolNotFound error. +func extractDeleteVolumeGroupError(err error) error { + switch { + case errors.Is(err, util.ErrPoolNotFound): + // if error is ErrPoolNotFound, the pool is already deleted we dont + // need to worry about deleting snapshot or omap data, return success + return nil + case errors.Is(err, util.ErrKeyNotFound): + // if error is ErrKeyNotFound, then a previous attempt at deletion was complete + // or partially complete (snap and snapOMap are garbage collected already), hence return + // success as deletion is complete + return nil + } + + return err +} diff --git a/internal/cephfs/groupcontrollerserver_test.go b/internal/cephfs/groupcontrollerserver_test.go new file mode 100644 index 00000000000..eae8d834a38 --- /dev/null +++ b/internal/cephfs/groupcontrollerserver_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "context" + "testing" + + csicommon "github.com/ceph/ceph-csi/internal/csi-common" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestControllerServer_validateCreateVolumeGroupSnapshotRequest(t *testing.T) { + t.Parallel() + cs := ControllerServer{ + DefaultControllerServer: csicommon.NewDefaultControllerServer( + csicommon.NewCSIDriver("cephfs.csi.ceph.com", "1.0.0", "test")), + } + + type args struct { + ctx context.Context + req *csi.CreateVolumeGroupSnapshotRequest + } + tests := []struct { + name string + args args + wantErr bool + code codes.Code + }{ + { + "valid CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + Parameters: map[string]string{ + "clusterID": "value", + "fsName": "value", + }, + }, + }, + false, + codes.OK, + }, + { + "empty request name in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + SourceVolumeIds: []string{"vg-1"}, + }, + }, + true, + codes.InvalidArgument, + }, + { + "empty SourceVolumeIds in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + }, + }, + true, + codes.InvalidArgument, + }, + { + "empty clusterID in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + Parameters: map[string]string{"fsName": "value"}, + }, + }, + true, + codes.InvalidArgument, + }, + { + "empty fsName in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + Parameters: map[string]string{"clusterID": "value"}, + }, + }, + true, + codes.InvalidArgument, + }, + } + for _, tt := range tests { + ts := tt + t.Run(ts.name, func(t *testing.T) { + t.Parallel() + err := cs.validateCreateVolumeGroupSnapshotRequest(ts.args.ctx, ts.args.req) + if ts.wantErr { + c := status.Code(err) + if c != ts.code { + t.Errorf("ControllerServer.validateVolumeGroupSnapshotRequest() error = %v, want code %v", err, c) + } + } + }) + } +} diff --git a/internal/cephfs/identityserver.go b/internal/cephfs/identityserver.go index 625fc38424e..b2a041a3272 100644 --- a/internal/cephfs/identityserver.go +++ b/internal/cephfs/identityserver.go @@ -58,6 +58,13 @@ func (is *IdentityServer) GetPluginCapabilities( }, }, }, + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_GROUP_CONTROLLER_SERVICE, + }, + }, + }, }, }, nil } diff --git a/internal/cephfs/store/fsjournal.go b/internal/cephfs/store/fsjournal.go index a8fbcdb5015..c9f9a16d7e9 100644 --- a/internal/cephfs/store/fsjournal.go +++ b/internal/cephfs/store/fsjournal.go @@ -40,6 +40,10 @@ var ( // SnapJournal is used to maintain RADOS based journals for CO generated. // SnapshotName to backing CephFS subvolumes. SnapJournal *journal.Config + + // VolumeGroupJournal is used to maintain RADOS based journals for CO + // generate request name to CephFS snapshot group attributes. + VolumeGroupJournal journal.VolumeGroupJournalConfig ) // VolumeIdentifier structure contains an association between the CSI VolumeID to its subvolume diff --git a/internal/cephfs/store/volumegroup.go b/internal/cephfs/store/volumegroup.go new file mode 100644 index 00000000000..4286ad76fac --- /dev/null +++ b/internal/cephfs/store/volumegroup.go @@ -0,0 +1,285 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "context" + "fmt" + + "github.com/ceph/ceph-csi/internal/cephfs/core" + cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" + fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +type VolumeGroupOptions struct { + *VolumeOptions +} + +// NewVolumeGroupOptions generates a new instance of volumeGroupOptions from the provided +// CSI request parameters. +func NewVolumeGroupOptions( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, + cr *util.Credentials, +) (*VolumeGroupOptions, error) { + var ( + opts = &VolumeGroupOptions{} + err error + ) + + volOptions := req.GetParameters() + opts.VolumeOptions, err = getVolumeOptions(volOptions) + if err != nil { + return nil, err + } + + if err = extractOptionalOption(&opts.NamePrefix, "volumeGroupNamePrefix", volOptions); err != nil { + return nil, err + } + + opts.RequestName = req.GetName() + + err = opts.Connect(cr) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + opts.Destroy() + } + }() + + fs := core.NewFileSystem(opts.conn) + opts.FscID, err = fs.GetFscID(ctx, opts.FsName) + if err != nil { + return nil, err + } + + opts.MetadataPool, err = fs.GetMetadataPool(ctx, opts.FsName) + if err != nil { + return nil, err + } + + return opts, nil +} + +type VolumeGroupSnapshotIdentifier struct { + ReservedID string + FsVolumeGroupSnapshotName string + VolumeGroupSnapshotID string + RequestName string + VolumeSnapshotMap map[string]string +} + +// GetVolumeIDs returns the list of volumeIDs in the VolumeSnaphotMap. +func (vgsi *VolumeGroupSnapshotIdentifier) GetVolumeIDs() []string { + keys := make([]string, 0, len(vgsi.VolumeSnapshotMap)) + for k := range vgsi.VolumeSnapshotMap { + keys = append(keys, k) + } + + return keys +} + +// NewVolumeGroupOptionsFromID generates a new instance of volumeGroupOptions and GroupIdentifier +// from the provided CSI volumeGroupSnapshotID. +func NewVolumeGroupOptionsFromID( + ctx context.Context, + volumeGroupSnapshotID string, + cr *util.Credentials, +) (*VolumeGroupOptions, *VolumeGroupSnapshotIdentifier, error) { + var ( + vi util.CSIIdentifier + volOptions = &VolumeGroupOptions{} + vgs VolumeGroupSnapshotIdentifier + ) + // Decode the snapID first, to detect pre-provisioned snapshot before other errors + err := vi.DecomposeCSIID(volumeGroupSnapshotID) + if err != nil { + return nil, nil, cerrors.ErrInvalidVolID + } + volOptions.VolumeOptions = &VolumeOptions{} + volOptions.ClusterID = vi.ClusterID + vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID + volOptions.FscID = vi.LocationID + vgs.ReservedID = vi.ObjectUUID + + if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil { + return nil, nil, fmt.Errorf( + "failed to fetch monitor list using clusterID (%s): %w", + vi.ClusterID, + err) + } + + err = volOptions.Connect(cr) + if err != nil { + return nil, nil, err + } + // in case of an error, volOptions is returned, but callers may not + // expect to need to call Destroy() on it. So, make sure to release any + // resources that may have been allocated + defer func() { + if err != nil { + volOptions.Destroy() + } + }() + + fs := core.NewFileSystem(volOptions.conn) + volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID) + if err != nil { + return nil, nil, err + } + + volOptions.MetadataPool, err = fs.GetMetadataPool(ctx, volOptions.FsName) + if err != nil { + return nil, nil, err + } + + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, nil, err + } + defer j.Destroy() + + groupAttributes, err := j.GetVolumeGroupAttributes( + ctx, volOptions.MetadataPool, vi.ObjectUUID) + if err != nil { + return nil, nil, err + } + + vgs.RequestName = groupAttributes.RequestName + vgs.FsVolumeGroupSnapshotName = groupAttributes.GroupName + vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID + vgs.VolumeSnapshotMap = groupAttributes.VolumeSnapshotMap + + return volOptions, &vgs, nil +} + +/* +CheckVolumeGroupSnapExists checks to determine if passed in RequestName in +volGroupOptions exists on the backend. + +**NOTE:** These functions manipulate the rados omaps that hold information +regarding volume group snapshot names as requested by the CSI drivers. Hence, +these need to be invoked only when the respective CSI driver generated volume +group snapshot name based locks are held, as otherwise racy access to these +omaps may end up leaving them in an inconsistent state. +*/ +func CheckVolumeGroupSnapExists( + ctx context.Context, + volOptions *VolumeGroupOptions, + cr *util.Credentials, +) (*VolumeGroupSnapshotIdentifier, error) { + // Connect to cephfs' default radosNamespace (csi) + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, err + } + defer j.Destroy() + + volGroupData, err := j.CheckReservation( + ctx, volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix) + if err != nil { + return nil, err + } + if volGroupData == nil { + return nil, nil + } + vgs := &VolumeGroupSnapshotIdentifier{} + vgs.RequestName = volOptions.RequestName + vgs.ReservedID = volGroupData.GroupUUID + vgs.FsVolumeGroupSnapshotName = volGroupData.GroupName + vgs.VolumeSnapshotMap = volGroupData.VolumeGroupAttributes.VolumeSnapshotMap + + // found a snapshot already available, process and return it! + vgs.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, + "", volOptions.ClusterID, volGroupData.GroupUUID) + if err != nil { + return nil, err + } + log.DebugLog(ctx, "Found existing volume group snapshot (%s) with UUID (%s) for request (%s) and mapping %v", + vgs.RequestName, volGroupData.GroupUUID, vgs.RequestName, vgs.VolumeSnapshotMap) + + return vgs, nil +} + +// ReserveVolumeGroup is a helper routine to request a UUID reservation for the +// CSI request name and, +// to generate the volumegroup snapshot identifier for the reserved UUID. +func ReserveVolumeGroup( + ctx context.Context, + volOptions *VolumeGroupOptions, + cr *util.Credentials, +) (*VolumeGroupSnapshotIdentifier, error) { + var ( + vgsi VolumeGroupSnapshotIdentifier + groupUUID string + err error + ) + + vgsi.RequestName = volOptions.RequestName + // Connect to cephfs' default radosNamespace (csi) + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, err + } + defer j.Destroy() + + groupUUID, vgsi.FsVolumeGroupSnapshotName, err = j.ReserveName( + ctx, volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName, volOptions.NamePrefix) + if err != nil { + return nil, err + } + + // generate the snapshot ID to return to the CO system + vgsi.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, + "", volOptions.ClusterID, groupUUID) + if err != nil { + return nil, err + } + + log.DebugLog(ctx, "Generated volume group snapshot ID (%s) for request name (%s)", + vgsi.VolumeGroupSnapshotID, volOptions.RequestName) + + return &vgsi, nil +} + +// UndoVolumeGroupReservation is a helper routine to undo a name reservation +// for a CSI volumeGroupSnapshot name. +func UndoVolumeGroupReservation( + ctx context.Context, + volOptions *VolumeGroupOptions, + vgsi *VolumeGroupSnapshotIdentifier, + cr *util.Credentials, +) error { + // Connect to cephfs' default radosNamespace (csi) + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return err + } + defer j.Destroy() + + err = j.UndoReservation(ctx, volOptions.MetadataPool, + vgsi.FsVolumeGroupSnapshotName, vgsi.RequestName) + + return err +} diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 068afd72597..5ed493e86fa 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -209,10 +209,32 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error { optName, actual, expected) } +// getVolumeOptions validates the basic required basic options provided in the +// volume parameters and extract the volumeOptions from volume parameters. +// It contains the following checks: +// - clusterID must be set +// - monitors must be set +// - fsName must be set. +func getVolumeOptions(vo map[string]string) (*VolumeOptions, error) { + opts := VolumeOptions{} + clusterData, err := GetClusterInformation(vo) + if err != nil { + return nil, err + } + + opts.ClusterID = clusterData.ClusterID + opts.Monitors = strings.Join(clusterData.Monitors, ",") + opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup + + if err = extractOption(&opts.FsName, "fsName", vo); err != nil { + return nil, err + } + + return &opts, nil +} + // NewVolumeOptions generates a new instance of volumeOptions from the provided // CSI request parameters. -// -//nolint:gocyclo,cyclop // TODO: reduce complexity func NewVolumeOptions( ctx context.Context, requestName, @@ -222,20 +244,17 @@ func NewVolumeOptions( cr *util.Credentials, ) (*VolumeOptions, error) { var ( - opts VolumeOptions + opts *VolumeOptions backingSnapshotBool string err error ) volOptions := req.GetParameters() - clusterData, err := GetClusterInformation(volOptions) + opts, err = getVolumeOptions(volOptions) if err != nil { return nil, err } - opts.ClusterID = clusterData.ClusterID - opts.Monitors = strings.Join(clusterData.Monitors, ",") - opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup opts.Owner = k8s.GetOwner(volOptions) opts.BackingSnapshot = IsShallowVolumeSupported(req) @@ -247,10 +266,6 @@ func NewVolumeOptions( return nil, err } - if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil { - return nil, err - } - if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", volOptions); err != nil { return nil, err } @@ -323,7 +338,7 @@ func NewVolumeOptions( } } - return &opts, nil + return opts, nil } // IsShallowVolumeSupported returns true only for ReadOnly volume requests diff --git a/internal/csi-common/controllerserver-default.go b/internal/csi-common/controllerserver-default.go index 2ce290928b9..114638056e1 100644 --- a/internal/csi-common/controllerserver-default.go +++ b/internal/csi-common/controllerserver-default.go @@ -29,6 +29,7 @@ import ( // DefaultControllerServer points to default driver. type DefaultControllerServer struct { csi.UnimplementedControllerServer + csi.UnimplementedGroupControllerServer Driver *CSIDriver } diff --git a/internal/journal/volumegroupjournal.go b/internal/journal/volumegroupjournal.go index 1498748d389..75f06a3554c 100644 --- a/internal/journal/volumegroupjournal.go +++ b/internal/journal/volumegroupjournal.go @@ -32,15 +32,7 @@ const ( ) type VolumeGroupJournal interface { - // Connect establishes a new connection to a ceph cluster for journal metadata. - Connect( - monitors, - namespace string, - cr *util.Credentials) error - // Destroy frees any resources and invalidates the journal connection. Destroy() - // SetNamespace sets the namespace for the journal. - SetNamespace(ns string) CheckReservation( ctx context.Context, journalPool, @@ -78,16 +70,20 @@ type VolumeGroupJournal interface { volumeID string) error } -// volumeGroupJournalConfig contains the configuration and connection details. -type volumeGroupJournalConfig struct { - *Config - *Connection +// VolumeGroupJournalConfig contains the configuration. +type VolumeGroupJournalConfig struct { + Config +} + +type VolumeGroupJournalConnection struct { + config *VolumeGroupJournalConfig + connection *Connection } // NewCSIVolumeGroupJournal returns an instance of VolumeGroupJournal for groups. -func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal { - return &volumeGroupJournalConfig{ - Config: &Config{ +func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig { + return VolumeGroupJournalConfig{ + Config: Config{ csiDirectory: "csi.groups." + suffix, csiNameKeyPrefix: "csi.volume.group.", cephUUIDDirectoryPrefix: "csi.volume.group.", @@ -98,35 +94,42 @@ func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal { } } -func (sgj *volumeGroupJournalConfig) SetNamespace(ns string) { - sgj.Config.namespace = ns +// SetNamespace sets the namespace for the journal. +func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) { + vgc.Config.namespace = ns } // NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for // volume groups using a predetermined namespace value. -func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal { +func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig { j := NewCSIVolumeGroupJournal(suffix) j.SetNamespace(ns) return j } -func (sgj *volumeGroupJournalConfig) Connect( +// Connect establishes a new connection to a ceph cluster for journal metadata. +func (vgc *VolumeGroupJournalConfig) Connect( monitors, namespace string, cr *util.Credentials, -) error { - conn, err := sgj.Config.Connect(monitors, namespace, cr) +) (VolumeGroupJournal, error) { + vgjc := &VolumeGroupJournalConnection{} + vgjc.config = &VolumeGroupJournalConfig{ + Config: vgc.Config, + } + conn, err := vgc.Config.Connect(monitors, namespace, cr) if err != nil { - return err + return nil, err } - sgj.Connection = conn + vgjc.connection = conn - return nil + return vgjc, nil } -func (sgj *volumeGroupJournalConfig) Destroy() { - sgj.Connection.Destroy() +// Destroy frees any resources and invalidates the journal connection. +func (vgjc *VolumeGroupJournalConnection) Destroy() { + vgjc.connection.Destroy() } // VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a @@ -162,11 +165,11 @@ Return values: reservation found. - error: non-nil in case of any errors. */ -func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) CheckReservation(ctx context.Context, journalPool, reqName, namePrefix string, ) (*VolumeGroupData, error) { var ( - cj = sgj.Config + cj = vgjc.config volGroupData = &VolumeGroupData{} ) @@ -175,7 +178,7 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, cj.csiNameKeyPrefix + reqName, } values, err := getOMapValues( - ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory, cj.commonPrefix, fetchKeys) if err != nil { if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) { @@ -195,13 +198,13 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, } volGroupData.GroupUUID = objUUID - savedVolumeGroupAttributes, err := sgj.GetVolumeGroupAttributes(ctx, journalPool, + savedVolumeGroupAttributes, err := vgjc.GetVolumeGroupAttributes(ctx, journalPool, objUUID) if err != nil { // error should specifically be not found, for image to be absent, any other error // is not conclusive, and we should not proceed if errors.Is(err, util.ErrKeyNotFound) { - err = sgj.UndoReservation(ctx, journalPool, + err = vgjc.UndoReservation(ctx, journalPool, generateVolumeGroupName(namePrefix, objUUID), reqName) } @@ -239,11 +242,11 @@ Input arguments: - groupID: ID of the volume group, generated from the UUID - reqName: Request name for the volume group */ -func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) UndoReservation(ctx context.Context, csiJournalPool, groupID, reqName string, ) error { // delete volume UUID omap (first, inverse of create order) - cj := sgj.Config + cj := vgjc.config if groupID != "" { if len(groupID) < uuidEncodedLength { return fmt.Errorf("unable to parse UUID from %s, too short", groupID) @@ -256,8 +259,8 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, err := util.RemoveObject( ctx, - sgj.Connection.monitors, - sgj.Connection.cr, + vgjc.connection.monitors, + vgjc.connection.cr, csiJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+groupUUID) @@ -271,7 +274,7 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, } // delete the request name key (last, inverse of create order) - err := removeMapKeys(ctx, sgj.Connection, csiJournalPool, cj.namespace, cj.csiDirectory, + err := removeMapKeys(ctx, vgjc.connection, csiJournalPool, cj.namespace, cj.csiDirectory, []string{cj.csiNameKeyPrefix + reqName}) if err != nil { log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err) @@ -299,11 +302,11 @@ Return values: - string: Contains the VolumeGroup name that was reserved for the passed in reqName - error: non-nil in case of any errors */ -func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) ReserveName(ctx context.Context, journalPool string, journalPoolID int64, reqName, namePrefix string, ) (string, string, error) { - cj := sgj.Config + cj := vgjc.config // Create the UUID based omap first, to reserve the same and avoid conflicts // NOTE: If any service loss occurs post creation of the UUID directory, and before @@ -311,8 +314,8 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, // UUID directory key will be leaked objUUID, err := reserveOMapName( ctx, - sgj.Connection.monitors, - sgj.Connection.cr, + vgjc.connection.monitors, + vgjc.connection.cr, journalPool, cj.namespace, cj.cephUUIDDirectoryPrefix, @@ -325,7 +328,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, // After generating the UUID Directory omap, we populate the csiDirectory // omap with a key-value entry to map the request to the backend volume group: // `csiNameKeyPrefix + reqName: nameKeyVal` - err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory, map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal}) if err != nil { return "", "", err @@ -333,7 +336,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, defer func() { if err != nil { log.WarningLog(ctx, "reservation failed for volume group: %s", reqName) - errDefer := sgj.UndoReservation(ctx, journalPool, groupName, reqName) + errDefer := vgjc.UndoReservation(ctx, journalPool, groupName, reqName) if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer) } @@ -347,7 +350,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, omapValues[cj.csiNameKey] = reqName omapValues[cj.csiImageKey] = groupName - err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, oid, omapValues) + err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, oid, omapValues) if err != nil { return "", "", err } @@ -363,18 +366,18 @@ type VolumeGroupAttributes struct { VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping } -func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( +func (vgjc *VolumeGroupJournalConnection) GetVolumeGroupAttributes( ctx context.Context, pool, objectUUID string, ) (*VolumeGroupAttributes, error) { var ( err error groupAttributes = &VolumeGroupAttributes{} - cj = sgj.Config + cj = vgjc.config ) values, err := listOMapValues( - ctx, sgj.Connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, + ctx, vgjc.connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.commonPrefix) if err != nil { if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) { @@ -398,14 +401,14 @@ func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( return groupAttributes, nil } -func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( +func (vgjc *VolumeGroupJournalConnection) AddVolumeSnapshotMapping( ctx context.Context, pool, reservedUUID, volumeID, snapshotID string, ) error { - err := setOMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + err := setOMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, vgjc.config.cephUUIDDirectoryPrefix+reservedUUID, map[string]string{volumeID: snapshotID}) if err != nil { log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err) @@ -416,13 +419,14 @@ func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( return nil } -func (sgj *volumeGroupJournalConfig) RemoveVolumeSnapshotMapping( +func (vgjc *VolumeGroupJournalConnection) RemoveVolumeSnapshotMapping( ctx context.Context, pool, reservedUUID, volumeID string, ) error { - err := removeMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + err := removeMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, + vgjc.config.cephUUIDDirectoryPrefix+reservedUUID, []string{volumeID}) if err != nil { log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err) diff --git a/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go b/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go index a18c83304f9..25055d6b818 100644 --- a/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go +++ b/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go @@ -12547,6 +12547,9 @@ var awsPartition = partition{ endpointKey{ Region: "eu-south-1", }: endpoint{}, + endpointKey{ + Region: "eu-south-2", + }: endpoint{}, endpointKey{ Region: "eu-west-1", }: endpoint{}, @@ -14554,6 +14557,9 @@ var awsPartition = partition{ endpointKey{ Region: "ca-central-1", }: endpoint{}, + endpointKey{ + Region: "ca-west-1", + }: endpoint{}, endpointKey{ Region: "eu-central-1", }: endpoint{}, @@ -19213,66 +19219,222 @@ var awsPartition = partition{ endpointKey{ Region: "af-south-1", }: endpoint{}, + endpointKey{ + Region: "af-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.af-south-1.api.aws", + }, endpointKey{ Region: "ap-east-1", }: endpoint{}, + endpointKey{ + Region: "ap-east-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-east-1.api.aws", + }, endpointKey{ Region: "ap-northeast-1", }: endpoint{}, + endpointKey{ + Region: "ap-northeast-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-northeast-1.api.aws", + }, endpointKey{ Region: "ap-northeast-2", }: endpoint{}, + endpointKey{ + Region: "ap-northeast-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-northeast-2.api.aws", + }, endpointKey{ Region: "ap-northeast-3", }: endpoint{}, + endpointKey{ + Region: "ap-northeast-3", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-northeast-3.api.aws", + }, endpointKey{ Region: "ap-south-1", }: endpoint{}, + endpointKey{ + Region: "ap-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-south-1.api.aws", + }, endpointKey{ Region: "ap-south-2", }: endpoint{}, + endpointKey{ + Region: "ap-south-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-south-2.api.aws", + }, endpointKey{ Region: "ap-southeast-1", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-1.api.aws", + }, endpointKey{ Region: "ap-southeast-2", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-2.api.aws", + }, endpointKey{ Region: "ap-southeast-3", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-3", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-3.api.aws", + }, endpointKey{ Region: "ap-southeast-4", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-4", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-4.api.aws", + }, endpointKey{ Region: "ca-central-1", }: endpoint{}, + endpointKey{ + Region: "ca-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ca-central-1.api.aws", + }, + endpointKey{ + Region: "ca-central-1", + Variant: fipsVariant, + }: endpoint{ + Hostname: "logs-fips.ca-central-1.amazonaws.com", + }, endpointKey{ Region: "ca-west-1", }: endpoint{}, + endpointKey{ + Region: "ca-west-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ca-west-1.api.aws", + }, + endpointKey{ + Region: "ca-west-1", + Variant: fipsVariant, + }: endpoint{ + Hostname: "logs-fips.ca-west-1.amazonaws.com", + }, endpointKey{ Region: "eu-central-1", }: endpoint{}, + endpointKey{ + Region: "eu-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-central-1.api.aws", + }, endpointKey{ Region: "eu-central-2", }: endpoint{}, + endpointKey{ + Region: "eu-central-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-central-2.api.aws", + }, endpointKey{ Region: "eu-north-1", }: endpoint{}, + endpointKey{ + Region: "eu-north-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-north-1.api.aws", + }, endpointKey{ Region: "eu-south-1", }: endpoint{}, + endpointKey{ + Region: "eu-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-south-1.api.aws", + }, endpointKey{ Region: "eu-south-2", }: endpoint{}, + endpointKey{ + Region: "eu-south-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-south-2.api.aws", + }, endpointKey{ Region: "eu-west-1", }: endpoint{}, + endpointKey{ + Region: "eu-west-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-west-1.api.aws", + }, endpointKey{ Region: "eu-west-2", }: endpoint{}, + endpointKey{ + Region: "eu-west-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-west-2.api.aws", + }, endpointKey{ Region: "eu-west-3", }: endpoint{}, + endpointKey{ + Region: "eu-west-3", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-west-3.api.aws", + }, + endpointKey{ + Region: "fips-ca-central-1", + }: endpoint{ + Hostname: "logs-fips.ca-central-1.amazonaws.com", + CredentialScope: credentialScope{ + Region: "ca-central-1", + }, + Deprecated: boxedTrue, + }, + endpointKey{ + Region: "fips-ca-west-1", + }: endpoint{ + Hostname: "logs-fips.ca-west-1.amazonaws.com", + CredentialScope: credentialScope{ + Region: "ca-west-1", + }, + Deprecated: boxedTrue, + }, endpointKey{ Region: "fips-us-east-1", }: endpoint{ @@ -19312,18 +19474,48 @@ var awsPartition = partition{ endpointKey{ Region: "il-central-1", }: endpoint{}, + endpointKey{ + Region: "il-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.il-central-1.api.aws", + }, endpointKey{ Region: "me-central-1", }: endpoint{}, + endpointKey{ + Region: "me-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.me-central-1.api.aws", + }, endpointKey{ Region: "me-south-1", }: endpoint{}, + endpointKey{ + Region: "me-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.me-south-1.api.aws", + }, endpointKey{ Region: "sa-east-1", }: endpoint{}, + endpointKey{ + Region: "sa-east-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.sa-east-1.api.aws", + }, endpointKey{ Region: "us-east-1", }: endpoint{}, + endpointKey{ + Region: "us-east-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-east-1.api.aws", + }, endpointKey{ Region: "us-east-1", Variant: fipsVariant, @@ -19333,6 +19525,12 @@ var awsPartition = partition{ endpointKey{ Region: "us-east-2", }: endpoint{}, + endpointKey{ + Region: "us-east-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-east-2.api.aws", + }, endpointKey{ Region: "us-east-2", Variant: fipsVariant, @@ -19342,6 +19540,12 @@ var awsPartition = partition{ endpointKey{ Region: "us-west-1", }: endpoint{}, + endpointKey{ + Region: "us-west-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-west-1.api.aws", + }, endpointKey{ Region: "us-west-1", Variant: fipsVariant, @@ -19351,6 +19555,12 @@ var awsPartition = partition{ endpointKey{ Region: "us-west-2", }: endpoint{}, + endpointKey{ + Region: "us-west-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-west-2.api.aws", + }, endpointKey{ Region: "us-west-2", Variant: fipsVariant, diff --git a/vendor/github.com/aws/aws-sdk-go/aws/version.go b/vendor/github.com/aws/aws-sdk-go/aws/version.go index ed0729d446b..554b0ebde8b 100644 --- a/vendor/github.com/aws/aws-sdk-go/aws/version.go +++ b/vendor/github.com/aws/aws-sdk-go/aws/version.go @@ -5,4 +5,4 @@ package aws const SDKName = "aws-sdk-go" // SDKVersion is the version of this SDK -const SDKVersion = "1.50.26" +const SDKVersion = "1.50.32" diff --git a/vendor/github.com/ceph/go-ceph/cephfs/admin/fs_quiesce.go b/vendor/github.com/ceph/go-ceph/cephfs/admin/fs_quiesce.go new file mode 100644 index 00000000000..3c93dcc8c26 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/cephfs/admin/fs_quiesce.go @@ -0,0 +1,135 @@ +//go:build ceph_preview + +package admin + +import "fmt" + +// fixedPointFloat is a custom type that implements the MarshalJSON interface. +// This is used to format float64 values to two decimal places. +// By default these get converted to integers in the JSON output and +// fail the command. +type fixedPointFloat float64 + +// MarshalJSON provides a custom implementation for the JSON marshalling +// of fixedPointFloat. It formats the float to two decimal places. +func (fpf fixedPointFloat) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%.2f", float64(fpf))), nil +} + +// fSQuiesceFields is the internal type used to create JSON for ceph. +// See FSQuiesceOptions for the type that users of the library +// interact with. +type fSQuiesceFields struct { + Prefix string `json:"prefix"` + VolName string `json:"vol_name"` + GroupName string `json:"group_name,omitempty"` + Members []string `json:"members,omitempty"` + SetId string `json:"set_id,omitempty"` + Timeout fixedPointFloat `json:"timeout,omitempty"` + Expiration fixedPointFloat `json:"expiration,omitempty"` + AwaitFor fixedPointFloat `json:"await_for,omitempty"` + Await bool `json:"await,omitempty"` + IfVersion int `json:"if_version,omitempty"` + Include bool `json:"include,omitempty"` + Exclude bool `json:"exclude,omitempty"` + Reset bool `json:"reset,omitempty"` + Release bool `json:"release,omitempty"` + Query bool `json:"query,omitempty"` + All bool `json:"all,omitempty"` + Cancel bool `json:"cancel,omitempty"` +} + +// FSQuiesceOptions are used to specify optional, non-identifying, values +// to be used when quiescing a cephfs volume. +type FSQuiesceOptions struct { + Timeout float64 + Expiration float64 + AwaitFor float64 + Await bool + IfVersion int + Include bool + Exclude bool + Reset bool + Release bool + Query bool + All bool + Cancel bool +} + +// toFields is used to convert the FSQuiesceOptions to the internal +// fSQuiesceFields type. +func (o *FSQuiesceOptions) toFields(volume, group string, subvolumes []string, setId string) *fSQuiesceFields { + return &fSQuiesceFields{ + Prefix: "fs quiesce", + VolName: volume, + GroupName: group, + Members: subvolumes, + SetId: setId, + Timeout: fixedPointFloat(o.Timeout), + Expiration: fixedPointFloat(o.Expiration), + AwaitFor: fixedPointFloat(o.AwaitFor), + Await: o.Await, + IfVersion: o.IfVersion, + Include: o.Include, + Exclude: o.Exclude, + Reset: o.Reset, + Release: o.Release, + Query: o.Query, + All: o.All, + Cancel: o.Cancel, + } +} + +// QuiesceState is used to report the state of a quiesced fs volume. +type QuiesceState struct { + Name string `json:"name"` + Age float64 `json:"age"` +} + +// QuiesceInfoMember is used to report the state of a quiesced fs volume. +// This is part of sets members object array in the json. +type QuiesceInfoMember struct { + Excluded bool `json:"excluded"` + State QuiesceState `json:"state"` +} + +// QuiesceInfo reports various informational values about a quiesced volume. +// This is returned as sets object array in the json. +type QuiesceInfo struct { + Version int `json:"version"` + AgeRef float64 `json:"age_ref"` + State QuiesceState `json:"state"` + Timeout float64 `json:"timeout"` + Expiration float64 `json:"expiration"` + Members map[string]QuiesceInfoMember `json:"members"` +} + +// FSQuiesceInfo reports various informational values about quiesced volumes. +type FSQuiesceInfo struct { + Epoch int `json:"epoch"` + SetVersion int `json:"set_version"` + Sets map[string]QuiesceInfo `json:"sets"` +} + +// parseFSQuiesceInfo is used to parse the response from the quiesce command. It returns a FSQuiesceInfo object. +func parseFSQuiesceInfo(res response) (*FSQuiesceInfo, error) { + var info FSQuiesceInfo + if err := res.NoStatus().Unmarshal(&info).End(); err != nil { + return nil, err + } + return &info, nil +} + +// FSQuiesce will quiesce the specified subvolumes in a volume. +// Quiescing a fs will prevent new writes to the subvolumes. +// Similar To: +// +// ceph fs quiesce +func (fsa *FSAdmin) FSQuiesce(volume, group string, subvolumes []string, setId string, o *FSQuiesceOptions) (*FSQuiesceInfo, error) { + if o == nil { + o = &FSQuiesceOptions{} + } + f := o.toFields(volume, group, subvolumes, setId) + + return parseFSQuiesceInfo(fsa.marshalMgrCommand(f)) +} diff --git a/vendor/github.com/ceph/go-ceph/rbd/snap_group_namespace.go b/vendor/github.com/ceph/go-ceph/rbd/snap_group_namespace.go new file mode 100644 index 00000000000..88f975d058b --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rbd/snap_group_namespace.go @@ -0,0 +1,53 @@ +//go:build ceph_preview + +package rbd + +// #cgo LDFLAGS: -lrbd +// #include +import "C" + +// SnapGroupNamespace provides details about a single snapshot that was taken +// as part of an RBD group. +type SnapGroupNamespace struct { + Pool uint64 + GroupName string + GroupSnapName string +} + +// GetSnapGroupNamespace returns the SnapGroupNamespace of the snapshot which +// is part of a group. The caller should make sure that the snapshot ID passed +// in this function belongs to a snapshot that was taken as part of a group +// snapshot. +// +// Implements: +// +// int rbd_snap_get_group_namespace(rbd_image_t image, uint64_t snap_id, +// rbd_snap_group_namespace_t *group_snap, +// size_t group_snap_size) +func (image *Image) GetSnapGroupNamespace(snapID uint64) (*SnapGroupNamespace, error) { + if err := image.validate(imageIsOpen); err != nil { + return nil, err + } + + var ( + err error + sgn C.rbd_snap_group_namespace_t + ) + + ret := C.rbd_snap_get_group_namespace(image.image, + C.uint64_t(snapID), + &sgn, + C.sizeof_rbd_snap_group_namespace_t) + err = getError(ret) + if err != nil { + return nil, err + } + + defer C.rbd_snap_group_namespace_cleanup(&sgn, C.sizeof_rbd_snap_group_namespace_t) + + return &SnapGroupNamespace{ + Pool: uint64(sgn.group_pool), + GroupName: C.GoString(sgn.group_name), + GroupSnapName: C.GoString(sgn.group_snap_name), + }, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0ff4bae3dec..914a8b8f114 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -81,7 +81,7 @@ github.com/antlr/antlr4/runtime/Go/antlr/v4 # github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a ## explicit github.com/asaskevich/govalidator -# github.com/aws/aws-sdk-go v1.50.26 +# github.com/aws/aws-sdk-go v1.50.32 ## explicit; go 1.19 github.com/aws/aws-sdk-go/aws github.com/aws/aws-sdk-go/aws/auth/bearer @@ -204,7 +204,7 @@ github.com/ceph/ceph-csi/api/deploy/kubernetes/cephfs github.com/ceph/ceph-csi/api/deploy/kubernetes/nfs github.com/ceph/ceph-csi/api/deploy/kubernetes/rbd github.com/ceph/ceph-csi/api/deploy/ocp -# github.com/ceph/go-ceph v0.26.0 +# github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243 ## explicit; go 1.19 github.com/ceph/go-ceph/cephfs github.com/ceph/go-ceph/cephfs/admin