Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: add backend support for VolumeGroup operations #4719

Merged
merged 9 commits into from
Jul 24, 2024
37 changes: 25 additions & 12 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package rbd

import (
"context"
"fmt"

"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/rbd/types"
Expand All @@ -37,12 +36,17 @@ type VolumeGroupServer struct {
// if volumegroup spec add more RPC services in the proto file, then we
// don't need to add all RPC methods leading to forward compatibility.
*volumegroup.UnimplementedControllerServer

// csiID is the unique ID for this CSI-driver deployment.
csiID string
}

// NewVolumeGroupServer creates a new VolumeGroupServer which handles the
// VolumeGroup Service requests from the CSI-Addons specification.
func NewVolumeGroupServer() *VolumeGroupServer {
return &VolumeGroupServer{}
func NewVolumeGroupServer(instanceID string) *VolumeGroupServer {
return &VolumeGroupServer{
csiID: instanceID,
}
}

func (vs *VolumeGroupServer) RegisterService(server grpc.ServiceRegistrar) {
Expand Down Expand Up @@ -77,7 +81,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
ctx context.Context,
req *volumegroup.CreateVolumeGroupRequest,
) (*volumegroup.CreateVolumeGroupResponse, error) {
mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(vs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve all volumes
Expand All @@ -98,7 +102,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
volumes[i] = vol
}

log.DebugLog(ctx, fmt.Sprintf("all %d Volumes for VolumeGroup %q have been found", len(volumes), req.GetName()))
log.DebugLog(ctx, "all %d Volumes for VolumeGroup %q have been found", len(volumes), req.GetName())

// create a RBDVolumeGroup
vg, err := mgr.CreateVolumeGroup(ctx, req.GetName())
Expand All @@ -110,7 +114,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q had been created", req.GetName()))
log.DebugLog(ctx, "VolumeGroup %q has been created: %+v", req.GetName(), vg)

// add each rbd-image to the RBDVolumeGroup
for _, vol := range volumes {
Expand All @@ -125,10 +129,19 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
}
}

log.DebugLog(ctx, fmt.Sprintf("all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName()))
log.DebugLog(ctx, "all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName())

csiVG, err := vg.ToCSI(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to convert volume group %q to CSI type: %s",
req.GetName(),
err.Error())
}

return &volumegroup.CreateVolumeGroupResponse{
VolumeGroup: vg.ToCSI(ctx),
VolumeGroup: csiVG,
}, nil
}

Expand All @@ -154,7 +167,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
ctx context.Context,
req *volumegroup.DeleteVolumeGroupRequest,
) (*volumegroup.DeleteVolumeGroupResponse, error) {
mgr := rbd.NewManager(nil, req.GetSecrets())
mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve the volume group
Expand All @@ -168,7 +181,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
}
defer vg.Destroy(ctx)

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q has been found", req.GetVolumeGroupId()))
log.DebugLog(ctx, "VolumeGroup %q has been found", req.GetVolumeGroupId())

// verify that the volume group is empty
volumes, err := vg.ListVolumes(ctx)
Expand All @@ -180,7 +193,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes)))
log.DebugLog(ctx, "VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes))

if len(volumes) != 0 {
return nil, status.Errorf(
Expand All @@ -198,7 +211,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
err.Error())
}

log.DebugLog(ctx, fmt.Sprintf("VolumeGroup %q has been deleted", req.GetVolumeGroupId()))
log.DebugLog(ctx, "VolumeGroup %q has been deleted", req.GetVolumeGroupId())

return &volumegroup.DeleteVolumeGroupResponse{}, nil
}
22 changes: 13 additions & 9 deletions internal/journal/volumegroupjournal.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,15 @@ type VolumeGroupJournalConfig struct {
Config
}

type VolumeGroupJournalConnection struct {
type volumeGroupJournalConnection struct {
config *VolumeGroupJournalConfig
connection *Connection
}

// assert that volumeGroupJournalConnection implements the VolumeGroupJournal
// interface.
var _ VolumeGroupJournal = &volumeGroupJournalConnection{}

// NewCSIVolumeGroupJournal returns an instance of VolumeGroupJournal for groups.
func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig {
return VolumeGroupJournalConfig{
Expand Down Expand Up @@ -116,7 +120,7 @@ func (vgc *VolumeGroupJournalConfig) Connect(
namespace string,
cr *util.Credentials,
) (VolumeGroupJournal, error) {
vgjc := &VolumeGroupJournalConnection{}
vgjc := &volumeGroupJournalConnection{}
vgjc.config = &VolumeGroupJournalConfig{
Config: vgc.Config,
}
Expand All @@ -130,7 +134,7 @@ func (vgc *VolumeGroupJournalConfig) Connect(
}

// Destroy frees any resources and invalidates the journal connection.
func (vgjc *VolumeGroupJournalConnection) Destroy() {
func (vgjc *volumeGroupJournalConnection) Destroy() {
vgjc.connection.Destroy()
}

Expand Down Expand Up @@ -167,7 +171,7 @@ Return values:
reservation found.
- error: non-nil in case of any errors.
*/
func (vgjc *VolumeGroupJournalConnection) CheckReservation(ctx context.Context,
func (vgjc *volumeGroupJournalConnection) CheckReservation(ctx context.Context,
journalPool, reqName, namePrefix string,
) (*VolumeGroupData, error) {
var (
Expand Down Expand Up @@ -244,7 +248,7 @@ Input arguments:
- groupID: ID of the volume group, generated from the UUID
- reqName: Request name for the volume group
*/
func (vgjc *VolumeGroupJournalConnection) UndoReservation(ctx context.Context,
func (vgjc *volumeGroupJournalConnection) UndoReservation(ctx context.Context,
csiJournalPool, groupID, reqName string,
) error {
// delete volume UUID omap (first, inverse of create order)
Expand Down Expand Up @@ -303,7 +307,7 @@ Return values:
- string: Contains the VolumeGroup name that was reserved for the passed in reqName
- error: non-nil in case of any errors
*/
func (vgjc *VolumeGroupJournalConnection) ReserveName(ctx context.Context,
func (vgjc *volumeGroupJournalConnection) ReserveName(ctx context.Context,
journalPool, reqName, namePrefix string,
) (string, string, error) {
cj := vgjc.config
Expand Down Expand Up @@ -366,7 +370,7 @@ type VolumeGroupAttributes struct {
VolumeMap map[string]string // Contains the volumeID and the corresponding value mapping
}

func (vgjc *VolumeGroupJournalConnection) GetVolumeGroupAttributes(
func (vgjc *volumeGroupJournalConnection) GetVolumeGroupAttributes(
ctx context.Context,
pool, objectUUID string,
) (*VolumeGroupAttributes, error) {
Expand Down Expand Up @@ -401,7 +405,7 @@ func (vgjc *VolumeGroupJournalConnection) GetVolumeGroupAttributes(
return groupAttributes, nil
}

func (vgjc *VolumeGroupJournalConnection) AddVolumesMapping(
func (vgjc *volumeGroupJournalConnection) AddVolumesMapping(
ctx context.Context,
pool,
reservedUUID string,
Expand All @@ -418,7 +422,7 @@ func (vgjc *VolumeGroupJournalConnection) AddVolumesMapping(
return nil
}

func (vgjc *VolumeGroupJournalConnection) RemoveVolumesMapping(
func (vgjc *volumeGroupJournalConnection) RemoveVolumesMapping(
ctx context.Context,
pool,
reservedUUID string,
Expand Down
21 changes: 14 additions & 7 deletions internal/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (cs *ControllerServer) parseVolCreateRequest(
return rbdVol, nil
}

func (rbdVol *rbdVolume) ToCSI(ctx context.Context) *csi.Volume {
func (rbdVol *rbdVolume) ToCSI(ctx context.Context) (*csi.Volume, error) {
vol := &csi.Volume{
VolumeId: rbdVol.VolID,
CapacityBytes: rbdVol.VolSize,
Expand All @@ -266,22 +266,29 @@ func (rbdVol *rbdVolume) ToCSI(ctx context.Context) *csi.Volume {
}
}

return vol
return vol, nil
}

func buildCreateVolumeResponse(
ctx context.Context,
req *csi.CreateVolumeRequest,
rbdVol *rbdVolume,
) *csi.CreateVolumeResponse {
volume := rbdVol.ToCSI(ctx)
) (*csi.CreateVolumeResponse, error) {
volume, err := rbdVol.ToCSI(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"BUG, can not happen: failed to convert volume %q to CSI type: %v",
rbdVol, err)
}

volume.ContentSource = req.GetVolumeContentSource()

for param, value := range util.GetVolumeContext(req.GetParameters()) {
volume.VolumeContext[param] = value
}

return &csi.CreateVolumeResponse{Volume: volume}
return &csi.CreateVolumeResponse{Volume: volume}, nil
}

// getGRPCErrorForCreateVolume converts the returns the GRPC errors based on
Expand Down Expand Up @@ -424,7 +431,7 @@ func (cs *ControllerServer) CreateVolume(
return nil, status.Error(codes.Internal, err.Error())
}

return buildCreateVolumeResponse(ctx, req, rbdVol), nil
return buildCreateVolumeResponse(ctx, req, rbdVol)
}

// flattenParentImage is to be called before proceeding with creating volume,
Expand Down Expand Up @@ -559,7 +566,7 @@ func (cs *ControllerServer) repairExistingVolume(ctx context.Context, req *csi.C
return nil, err
}

return buildCreateVolumeResponse(ctx, req, rbdVol), nil
return buildCreateVolumeResponse(ctx, req, rbdVol)
}

// check snapshots on the rbd image, as we have limit from krbd that an image
Expand Down
2 changes: 1 addition & 1 deletion internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
rcs := casrbd.NewReplicationServer(NewControllerServer(r.cd))
r.cas.RegisterService(rcs)

vgcs := casrbd.NewVolumeGroupServer()
vgcs := casrbd.NewVolumeGroupServer(conf.InstanceID)
r.cas.RegisterService(vgcs)
}

Expand Down
79 changes: 79 additions & 0 deletions internal/rbd/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2024 The Ceph-CSI Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package rbd

import (
"context"
"fmt"

librbd "github.com/ceph/go-ceph/rbd"

"github.com/ceph/ceph-csi/internal/rbd/types"
Comment on lines +23 to +25
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rearrange the order here? cephcsi import should come first

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this guideline is not followed everywere. I think and Golang suggests to have imports of local packages at the bottom, that seems to be a coding convention many other Go projects use.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/ceph/ceph-csi/blob/devel/docs/coding.md#imports this is one we have, lets stick to it and also update other places if its not followed as followup cleanup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #4721. I don't think we should strictly enforce it for existing imports, but it would be nice to use this order in new files.

)

// AddToGroup adds the image to the group. This is called from the rbd_group
// package.
func (rv *rbdVolume) AddToGroup(ctx context.Context, vg types.VolumeGroup) error {
ioctx, err := vg.GetIOContext(ctx)
Madhu-1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("could not get iocontext for volume group %q: %w", vg, err)
}

name, err := vg.GetName(ctx)
if err != nil {
return fmt.Errorf("could not get name for volume group %q: %w", vg, err)
}

// check if the image is already part of a group
// "rbd: ret=-17, File exists" is returned if the image is part of ANY group
image, err := rv.open()
if err != nil {
return fmt.Errorf("failed to open image %q: %w", rv, err)
}

info, err := image.GetGroup()
if err != nil {
return fmt.Errorf("could not get group information for image %q: %w", rv, err)
}

if info.Name != "" && info.Name != name {
return fmt.Errorf("image %q is already part of volume group %q", rv, info.Name)
}

err = librbd.GroupImageAdd(ioctx, name, rv.ioctx, rv.RbdImageName)
if err != nil {
return fmt.Errorf("failed to add image %q to volume group %q: %w", rv, vg, err)
}

return nil
}

// RemoveFromGroup removes the image from the group. This is called from the
// rbd_group package.
func (rv *rbdVolume) RemoveFromGroup(ctx context.Context, vg types.VolumeGroup) error {
ioctx, err := vg.GetIOContext(ctx)
Madhu-1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("could not get iocontext for volume group %q: %w", vg, err)
}

name, err := vg.GetName(ctx)
if err != nil {
return fmt.Errorf("could not get name for volume group %q: %w", vg, err)
}

return librbd.GroupImageRemove(ioctx, name, rv.ioctx, rv.RbdImageName)
}
Loading