Skip to content

Commit

Permalink
rbd: use the Manager to handle CSI-Addons VolumeGroup requests
Browse files Browse the repository at this point in the history
Signed-off-by: Niels de Vos <ndevos@ibm.com>
  • Loading branch information
nixpanic committed Jul 18, 2024
1 parent 34f787d commit beaf170
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 20 deletions.
15 changes: 12 additions & 3 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
ctx context.Context,
req *volumegroup.CreateVolumeGroupRequest,
) (*volumegroup.CreateVolumeGroupResponse, error) {
mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(vs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve all volumes
Expand Down Expand Up @@ -132,8 +132,17 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(

log.DebugLog(ctx, fmt.Sprintf("all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName()))

csiVG, err := vg.ToCSI(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to convert volume group %q to CSI type: %s",
req.GetName(),
err.Error())
}

return &volumegroup.CreateVolumeGroupResponse{
VolumeGroup: vg.ToCSI(ctx),
VolumeGroup: csiVG,
}, nil
}

Expand All @@ -159,7 +168,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
ctx context.Context,
req *volumegroup.DeleteVolumeGroupRequest,
) (*volumegroup.DeleteVolumeGroupResponse, error) {
mgr := rbd.NewManager(nil, req.GetSecrets())
mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve the volume group
Expand Down
163 changes: 146 additions & 17 deletions internal/rbd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,30 @@ package rbd
import (
"context"
"errors"
"fmt"

"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/rbd/types"
group "github.com/ceph/ceph-csi/internal/rbd_group"
"github.com/ceph/ceph-csi/internal/util"
)

var _ types.Manager = &rbdManager{}

type rbdManager struct {
csiID string
parameters map[string]string
secrets map[string]string

creds *util.Credentials
creds *util.Credentials
vgJournal journal.VolumeGroupJournal
}

// NewManager returns a new manager for handling Volume and Volume Group
// operations, combining the requests for RBD and the journalling in RADOS.
func NewManager(parameters, secrets map[string]string) types.Manager {
func NewManager(csiID string, parameters, secrets map[string]string) types.Manager {
return &rbdManager{
csiID: csiID,
parameters: parameters,
secrets: secrets,
}
Expand All @@ -47,43 +53,166 @@ func (mgr *rbdManager) Destroy(ctx context.Context) {
mgr.creds.DeleteCredentials()
mgr.creds = nil
}

if mgr.vgJournal != nil {
mgr.vgJournal.Destroy()
mgr.vgJournal = nil
}
}

// connect sets up credentials and connects to the journal.
func (mgr *rbdManager) connect() error {
if mgr.creds == nil {
creds, err := util.NewUserCredentials(mgr.secrets)
if err != nil {
return err
}
// getCredentials sets up credentials and connects to the journal.
func (mgr *rbdManager) getCredentials() (*util.Credentials, error) {
if mgr.creds != nil {
return mgr.creds, nil
}

mgr.creds = creds
creds, err := util.NewUserCredentials(mgr.secrets)
if err != nil {
return nil, fmt.Errorf("failed to get credentials: %w", err)
}

return nil
mgr.creds = creds

return creds, nil
}

func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) {
if err := mgr.connect(); err != nil {
func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) {
if mgr.vgJournal != nil {
return mgr.vgJournal, nil
}

creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}

volume, err := GenVolFromVolID(ctx, id, mgr.creds, mgr.secrets)
monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err)
}

ns, err := util.GetRadosNamespace(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err)
}

vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.csiID, ns)

vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds)
if err != nil {
return nil, fmt.Errorf("failed to connect to journal: %w", err)
}

mgr.vgJournal = vgJournal

return vgJournal, nil
}

func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) {
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}

volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets)
if err != nil {
return nil, fmt.Errorf("failed to get volume from id %q: %w", id, err)
}

return volume, nil
}

func (mgr *rbdManager) GetVolumeGroupByID(ctx context.Context, id string) (types.VolumeGroup, error) {
return nil, errors.New("rbdManager.GetVolumeGroupByID() is not implemented yet")
vi := &util.CSIIdentifier{}
if err := vi.DecomposeCSIID(id); err != nil {
return nil, fmt.Errorf("failed to parse volume group id %q: %w", id, err)
}

vgJournal, err := mgr.getVolumeGroupJournal(vi.ClusterID)
if err != nil {
return nil, err
}

creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}

vg, err := group.GetVolumeGroup(ctx, id, vgJournal, creds)
if err != nil {
return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err)
}

return vg, nil
}

func (mgr *rbdManager) CreateVolumeGroup(ctx context.Context, name string) (types.VolumeGroup, error) {
return nil, errors.New("rbdManager.CreateVolumeGroup() is not implemented yet")
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}

clusterID, err := util.GetClusterID(mgr.parameters)
if err != nil {
return nil, fmt.Errorf("failed to get cluster-id: %w", err)
}

vgJournal, err := mgr.getVolumeGroupJournal(clusterID)
if err != nil {
return nil, err
}

// pool is a required parameter
pool, ok := mgr.parameters["pool"]
if !ok {
return nil, errors.New("required 'pool' parameter missing in volume attributes")
}

// journalPool is an optional parameter, use pool if it is not set
journalPool, ok := mgr.parameters["journalPool"]
if !ok {
journalPool = pool
}

// volumeNamePrefix is an optional parameter, can be an empty string
prefix := mgr.parameters["volumeNamePrefix"]

uuid, vgName, err := vgJournal.ReserveName(ctx, journalPool, name, prefix)
if err != nil {
return nil, fmt.Errorf("failed to reserve volume group for name %q: %w", name, err)
}

monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err)
}

_ /*journalPoolID*/, poolID, err := util.GetPoolIDs(ctx, monitors, journalPool, pool, creds)
if err != nil {
return nil, fmt.Errorf("failed to generate a unique CSI volume group id for %q: %w", vgName, err)
}

csiID, err := util.GenerateVolID(ctx, monitors, creds, poolID, pool, clusterID, uuid)
if err != nil {
return nil, fmt.Errorf("failed to generate a unique CSI volume group id for %q: %w", vgName, err)
}

vg, err := group.GetVolumeGroup(ctx, csiID, vgJournal, creds)
if err != nil {
return nil, fmt.Errorf("failed to get volume group %q at cluster %q: %w", name, clusterID, err)
}

err = vg.Create(ctx, vgName)
if err != nil {
return nil, fmt.Errorf("failed to create volume group %q: %w", name, err)
}

// TODO: create the volume group in the journal

return vg, nil
}

func (mgr *rbdManager) DeleteVolumeGroup(ctx context.Context, vg types.VolumeGroup) error {
return errors.New("rbdManager.CreateVolumeGroup() is not implemented yet")
// TODO: remove from journal
return vg.Delete(ctx)
}

0 comments on commit beaf170

Please sign in to comment.