diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index 261bc67787a7..beda20d5bf19 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -82,7 +82,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( ctx context.Context, req *volumegroup.CreateVolumeGroupRequest, ) (*volumegroup.CreateVolumeGroupResponse, error) { - mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets()) + mgr := rbd.NewManager(vs.csiID, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) // resolve all volumes @@ -132,8 +132,17 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( log.DebugLog(ctx, fmt.Sprintf("all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName())) + csiVG, err := vg.ToCSI(ctx) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to convert volume group %q to CSI type: %s", + req.GetName(), + err.Error()) + } + return &volumegroup.CreateVolumeGroupResponse{ - VolumeGroup: vg.ToCSI(ctx), + VolumeGroup: csiVG, }, nil } @@ -159,7 +168,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup( ctx context.Context, req *volumegroup.DeleteVolumeGroupRequest, ) (*volumegroup.DeleteVolumeGroupResponse, error) { - mgr := rbd.NewManager(nil, req.GetSecrets()) + mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets()) defer mgr.Destroy(ctx) // resolve the volume group diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index cf63381d1580..b5e8ebbd65d1 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -18,25 +18,31 @@ package rbd import ( "context" + "errors" + "fmt" - group "github.com/ceph/ceph-csi/internal/rbd_group" + "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/rbd/types" + group "github.com/ceph/ceph-csi/internal/rbd_group" "github.com/ceph/ceph-csi/internal/util" ) var _ types.Manager = &rbdManager{} type rbdManager struct { + csiID string parameters map[string]string secrets map[string]string - creds *util.Credentials + creds *util.Credentials + vgJournal journal.VolumeGroupJournal } // NewManager returns a new manager for handling Volume and Volume Group // operations, combining the requests for RBD and the journalling in RADOS. -func NewManager(parameters, secrets map[string]string) types.Manager { +func NewManager(csiID string, parameters, secrets map[string]string) types.Manager { return &rbdManager{ + csiID: csiID, parameters: parameters, secrets: secrets, } @@ -47,41 +53,163 @@ func (mgr *rbdManager) Destroy(ctx context.Context) { mgr.creds.DeleteCredentials() mgr.creds = nil } + + if mgr.vgJournal != nil { + mgr.vgJournal.Destroy() + mgr.vgJournal = nil + } } -// connect sets up credentials and connects to the journal. -func (mgr *rbdManager) connect() error { - if mgr.creds == nil { - creds, err := util.NewUserCredentials(mgr.secrets) - if err != nil { - return err - } +// getCredentials sets up credentials and connects to the journal. +func (mgr *rbdManager) getCredentials() (*util.Credentials, error) { + if mgr.creds != nil { + return mgr.creds, nil + } - mgr.creds = creds + creds, err := util.NewUserCredentials(mgr.secrets) + if err != nil { + return nil, fmt.Errorf("failed to get credentials: %w", err) } - return nil + mgr.creds = creds + + return creds, nil } -func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) { - if err := mgr.connect(); err != nil { +func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) { + if mgr.vgJournal != nil { + return mgr.vgJournal, nil + } + + creds, err := mgr.getCredentials() + if err != nil { return nil, err } - volume, err := GenVolFromVolID(ctx, id, mgr.creds, mgr.secrets) + monitors, err := util.Mons(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err) + } + + ns, err := util.GetRadosNamespace(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err) + } + + vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.csiID, ns) + + vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds) + if err != nil { + return nil, fmt.Errorf("failed to connect to journal: %w", err) + } + + mgr.vgJournal = vgJournal + + return vgJournal, nil +} + +func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) { + creds, err := mgr.getCredentials() if err != nil { return nil, err } + volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets) + if err != nil { + return nil, fmt.Errorf("failed to get volume from id %q: %w", id, err) + } + return volume, nil } func (mgr *rbdManager) GetVolumeGroupByID(ctx context.Context, id string) (types.VolumeGroup, error) { - return group.GetVolumeGroup(ctx, id, mgr.secrets) + vi := &util.CSIIdentifier{} + if err := vi.DecomposeCSIID(id); err != nil { + return nil, fmt.Errorf("failed to parse volume group id %q: %w", id, err) + } + + vgJournal, err := mgr.getVolumeGroupJournal(vi.ClusterID) + if err != nil { + return nil, err + } + + creds, err := mgr.getCredentials() + if err != nil { + return nil, err + } + + vg, err := group.GetVolumeGroup(ctx, id, vgJournal, creds) + if err != nil { + return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err) + } + + return vg, nil } func (mgr *rbdManager) CreateVolumeGroup(ctx context.Context, name string) (types.VolumeGroup, error) { - return nil, nil + creds, err := mgr.getCredentials() + if err != nil { + return nil, err + } + + clusterID, err := util.GetClusterID(mgr.parameters) + if err != nil { + return nil, fmt.Errorf("failed to get cluster-id: %w", err) + } + + vgJournal, err := mgr.getVolumeGroupJournal(clusterID) + if err != nil { + return nil, err + } + + // pool is a required parameter + pool, ok := mgr.parameters["pool"] + if !ok { + return nil, errors.New("required 'pool' parameter missing in volume attributes") + } + + // journalPool is an optional parameter, use pool if it is not set + journalPool, ok := mgr.parameters["journalPool"] + if !ok { + journalPool = pool + } + + // volumeNamePrefix is an optional parameter, can be an empty string + prefix := mgr.parameters["volumeNamePrefix"] + + uuid, vgName, err := vgJournal.ReserveName(ctx, journalPool, name, prefix) + if err != nil { + return nil, fmt.Errorf("failed to reserve volume group for name %q: %w", name, err) + } + + monitors, err := util.Mons(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err) + } + + _ /*journalPoolID*/, poolID, err := util.GetPoolIDs(ctx, monitors, journalPool, pool, creds) + if err != nil { + return nil, fmt.Errorf("failed to generate a unique CSI volume group id for %q: %w", vgName, err) + } + + csiID, err := util.GenerateVolID(ctx, monitors, creds, poolID, pool, clusterID, uuid) + if err != nil { + return nil, fmt.Errorf("failed to generate a unique CSI volume group id for %q: %w", vgName, err) + } + + vg, err := group.GetVolumeGroup(ctx, csiID, vgJournal, creds) + if err != nil { + return nil, fmt.Errorf("failed to get volume group %q at cluster %q: %w", name, clusterID, err) + } + + err = vg.Create(ctx, vgName) + if err != nil { + return nil, fmt.Errorf("failed to create volume group %q: %w", name, err) + } + + // TODO: create the volume group in the journal + + return vg, nil } func (mgr *rbdManager) DeleteVolumeGroup(ctx context.Context, vg types.VolumeGroup) error {