Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metro volume expand support #350

Merged
merged 18 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/dell/gofsutil v1.16.1
github.com/dell/goiscsi v1.9.0
github.com/dell/gonvme v1.8.1
github.com/dell/gopowerstore v1.15.2-0.20240924141025-1c719e612669
github.com/dell/gopowerstore v1.15.2-0.20241004144335-14f8fa34731c
github.com/fsnotify/fsnotify v1.7.0
github.com/go-openapi/strfmt v0.23.0
github.com/golang/mock v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ github.com/dell/goiscsi v1.9.0 h1:VvMHbAO4vk80oc/TAbQPYlxysscCqVBW78GyPoUxgik=
github.com/dell/goiscsi v1.9.0/go.mod h1:NI/W/0O1UrMW2zVdMxy4z395Jn0r7utH6RQDFSZiFyQ=
github.com/dell/gonvme v1.8.1 h1:46M5lPqj7+Xjen+qxooRN9cx/+uJG4xtK9TpwduWDgE=
github.com/dell/gonvme v1.8.1/go.mod h1:ajbuF+fswq+ty2tRTG5FN4ecIMJsG7aDu/bkMynTKAs=
github.com/dell/gopowerstore v1.15.2-0.20240924141025-1c719e612669 h1:XktIu9B0VskV/nLyDFtsurvCVedi3czY0ziq52lf5RU=
github.com/dell/gopowerstore v1.15.2-0.20240924141025-1c719e612669/go.mod h1:vyN1JAZ+TO7Px+gNVa61a23/KwlI/Nj/6ttzMOQFyG0=
github.com/dell/gopowerstore v1.15.2-0.20241004144335-14f8fa34731c h1:/zP7pDYTZJePpfrXlqchxP5/q8+qF1ezZaOEp5qYFG0=
github.com/dell/gopowerstore v1.15.2-0.20241004144335-14f8fa34731c/go.mod h1:vyN1JAZ+TO7Px+gNVa61a23/KwlI/Nj/6ttzMOQFyG0=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
Expand Down
160 changes: 127 additions & 33 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,20 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
replicationEnabled := params[s.WithRP(KeyReplicationEnabled)]
var remoteSystemName string
isMetroVolume := false
isMetroVolumeGroup := false

if replicationEnabled == "true" && !useNFS {
if replicationEnabled == "true" {
if useNFS {
return nil, status.Error(codes.InvalidArgument, "replication not supported for NFS")
}

log.Info("Preparing volume replication")

remoteSystemName, ok = params[s.WithRP(KeyReplicationRemoteSystem)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no remote system specified in storage class")
return nil, status.Error(codes.InvalidArgument, "replication enabled but no remote system specified in storage class")
}
repMode := params[s.WithRP(KeyReplicationMode)]
// Default to ASYNC for backward compatibility
if repMode == "" {
repMode = common.AsyncMode
}
Expand All @@ -273,32 +277,32 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
log.Infof("%s replication mode requested", repMode)
vgPrefix, ok := params[s.WithRP(KeyReplicationVGPrefix)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no volume group prefix specified in storage class")
return nil, status.Error(codes.InvalidArgument, "replication enabled but no volume group prefix specified in storage class")
}

rpo, ok := params[s.WithRP(KeyReplicationRPO)]
if !ok {
// If Replication mode is ASYNC and there is no RPO specified, returning an error
if repMode == common.AsyncMode {
return nil, status.Errorf(codes.InvalidArgument, "replication mode is ASYNC but no RPO specified in storage class")
return nil, status.Error(codes.InvalidArgument, "replication mode is ASYNC but no RPO specified in storage class")
}
// If Replication mode is SYNC and there is no RPO, defaulting the value to Zero
rpo = common.Zero
}
rpoEnum := gopowerstore.RPOEnum(rpo)
if err := rpoEnum.IsValid(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid RPO value")
return nil, status.Error(codes.InvalidArgument, "invalid RPO value")
}

// Validating RPO to be non Zero when replication mode is ASYNC
if repMode == common.AsyncMode && rpo == common.Zero {
log.Errorf("RPO value for %s cannot be : %s", repMode, rpo)
return nil, status.Errorf(codes.InvalidArgument, "replication mode ASYNC requires RPO value to be non Zero")
return nil, status.Error(codes.InvalidArgument, "replication mode ASYNC requires RPO value to be non Zero")
}

// Validating RPO to be Zero whe replication mode is SYNC
if repMode == common.SyncMode && rpo != common.Zero {
return nil, status.Errorf(codes.InvalidArgument, "replication mode SYNC requires RPO value to be Zero")
return nil, status.Error(codes.InvalidArgument, "replication mode SYNC requires RPO value to be Zero")
}
namespace := ""
if ignoreNS, ok := params[s.WithRP(KeyReplicationIgnoreNamespaces)]; ok && ignoreNS == "false" {
Expand Down Expand Up @@ -370,11 +374,13 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
}
}

// Pass the VolumeGroup to the creator so it can create the new volume inside the vg
if c, ok := creator.(*SCSICreator); ok {
c.vg = &vg
}
case common.Metro:
// handle Metro mode where metro is configured directly on the volume (or volume group if requested)
// handle Metro mode where metro is configured directly on the volume
// Note: Metro on volume group support is not added
log.Info("Metro replication mode requested")

// Get specified remote system object for its ID
Expand All @@ -383,13 +389,7 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
return nil, status.Errorf(codes.Internal, "can't query remote system by name: %s", err.Error())
}

// TODO If volumeGroup input is specified in SC - Verify VolumeGroup exists, if not create one
// There shouldn't be any protection policy on it with replication rule
// Cannot configure Metro on empty VG. Configure after volume is added
// Check if the above sync/async block can be optimized w.r.t volume group calls
// isMetroVolumeGroup = true

isMetroVolume = true // set to true if volume group is not specified
isMetroVolume = true // set to true
default:
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but invalid replication mode specified in storage class")
}
Expand All @@ -413,7 +413,6 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
}

metroVolumeIDSuffix := ""

if isMetroVolume {
// Configure Metro on volume
volID := volumeResponse.VolumeId
Expand Down Expand Up @@ -444,11 +443,6 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
}
// Build the metro volume handle suffix
metroVolumeIDSuffix = ":" + replicationSession.RemoteResourceID + "/" + remoteSystem.SerialNumber
} else if isMetroVolumeGroup {
// TODO configure Metro on volume group if it is first time
// else pause and resume metro session for adding new volumes
// Session needs to be paused before the new volume can be added (before creator.Create()) and then resumed later here.
log.Warn("Configuring Metro on volume group, not yet implemented.")
}

// Fetch the service tag
Expand Down Expand Up @@ -482,7 +476,7 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}

id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
id, arrayID, protocol, remoteVolumeID, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.DeleteVolumeResponse{}, nil
Expand Down Expand Up @@ -572,7 +566,6 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
return nil, err

} else if protocol == "scsi" {
// query volume groups?
vgs, err := arr.GetClient().GetVolumeGroupsByVolumeID(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
Expand All @@ -586,8 +579,11 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
// TODO: Maybe adding volumegroup id/name to volume id can help?
_, err := arr.GetClient().RemoveMembersFromVolumeGroup(ctx, &gopowerstore.VolumeGroupMembers{VolumeIDs: []string{id}}, vgs.VolumeGroup[0].ID)
if err != nil {
// TODO: check for idempotency cases
return nil, err
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.VolumeAlreadyRemovedFromVolumeGroup() { // idempotency check
log.Debugf("Volume %s has already been removed from volume group %s", id, vgs.VolumeGroup[0].ID) // continue to delete volume
santhoshatdell marked this conversation as resolved.
Show resolved Hide resolved
} else {
return nil, status.Errorf(codes.Internal, "failed to remove volume %s from volume group: %s", id, err.Error())
}
}

// Unassign protection policy
Expand Down Expand Up @@ -623,6 +619,8 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
if err != nil {
return nil, status.Errorf(codes.Internal, "failure ending metro session on volume: %s", err.Error())
}
} else if remoteVolumeID != "" {
log.Debugf("Expected metro session for volume %s, but it seems to have been already removed.", id)
}

// Delete volume
Expand Down Expand Up @@ -1351,9 +1349,68 @@ func (s *Service) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReque
}, nil
}

func pauseMetroSession(ctx context.Context, metroSessionID string, arr *array.PowerStoreArray) (paused bool, err error) {
metroSession, err := arr.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session %s", metroSessionID)
}

// confirm the session is in a state we can pause from.
if metroSession.State != gopowerstore.RsStateOk &&
metroSession.State != gopowerstore.RsStateSynchronizing &&
metroSession.State != gopowerstore.RsStatePaused &&
metroSession.State != gopowerstore.RsStateSystemPaused &&
metroSession.State != gopowerstore.RsStateFractured {
return false, fmt.Errorf("could not pause the metro replication session, %s, because the session is not in expected state to pause", metroSession.ID)
donatwork marked this conversation as resolved.
Show resolved Hide resolved
}

if metroSession.State != gopowerstore.RsStatePaused {
log.Debugf("pausing metro replication session, %s", metroSession.ID)

// pause the replication session
_, err := arr.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionPause, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be paused: %s", metroSession.ID, err.Error())
}
} else {
log.Debugf("metro replication session, %s, already paused", metroSession.ID)
}
return true, nil
}

func resumeMetroSession(ctx context.Context, metroSessionID string, array *array.PowerStoreArray) (resumed bool, err error) {
metroSession, err := array.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session: %s", err.Error())
}

// nothing to do if not paused
if metroSession.State == gopowerstore.RsStateOk ||
metroSession.State == gopowerstore.RsStateSynchronizing ||
metroSession.State == gopowerstore.RsStateResuming ||
metroSession.State == gopowerstore.RsStateSwitchingToMetroSync ||
metroSession.State == gopowerstore.RsStateFractured {
log.Debugf("metro replication session, %s, already resumed", metroSession.ID)
return false, nil
}

// metro session can only be resumed if it is in 'paused' state
if metroSession.State != gopowerstore.RsStatePaused {
return false, errors.New("the metro session must be in 'paused' state before resuming")
donatwork marked this conversation as resolved.
Show resolved Hide resolved
}

log.Debugf("resuming metro replication session %s", metroSession.ID)
_, err = array.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionResume, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be resumed: %s", metroSession.ID, err.Error())
}

return true, nil
}

// ControllerExpandVolume resizes Volume or FileSystem by increasing available volume capacity in the storage array.
func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, req.VolumeId, s.DefaultArray(), nil)
id, arrayID, protocol, remoteVolumeID, _, err := array.ParseVolumeID(ctx, req.VolumeId, s.DefaultArray(), nil)
if err != nil {
return nil, status.Errorf(codes.OutOfRange, "unable to parse the volume id")
}
Expand All @@ -1363,24 +1420,61 @@ func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.Controlle
return nil, status.Errorf(codes.OutOfRange, "volume exceeds allowed limit")
}

array, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "unable to find array with ID %s", arrayID)
}
client := array.Client

if protocol == "scsi" {
vol, err := s.Arrays()[arrayID].Client.GetVolume(ctx, id)
vol, err := client.GetVolume(ctx, id)
if err != nil {
return nil, status.Errorf(codes.OutOfRange, "detected SCSI protocol but wasn't able to fetch the volume info")
return nil, status.Error(codes.NotFound, "detected SCSI protocol but wasn't able to fetch the volume info")
}

volExpanded := false // to return appropriate response based on whether the volume is expanded or not
isMetro := remoteVolumeID != ""
if isMetro && vol.MetroReplicationSessionID == "" {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session ID is empty for metro volume", vol.Name)
}

if vol.Size < requiredBytes {
_, err = s.Arrays()[arrayID].Client.ModifyVolume(context.Background(), &gopowerstore.VolumeModify{Size: requiredBytes}, id)
if isMetro {
// must pause metro session before modifying the volume
_, err = pauseMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be paused: %s", vol.Name, err.Error())
}
}

_, err = client.ModifyVolume(context.Background(), &gopowerstore.VolumeModify{Size: requiredBytes}, id)
if err != nil {
return nil, err
return nil, status.Errorf(codes.Internal, "unable to modify volume size: %s", err.Error())
}
volExpanded = true
}

// check the metro session state and resume if necessary
// in case the previous request failed after expanding the volume, resume the session
if isMetro {
volExpanded, err = resumeMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be resumed: %s", vol.Name, err.Error())
}
}
if volExpanded {
return &csi.ControllerExpandVolumeResponse{CapacityBytes: requiredBytes, NodeExpansionRequired: true}, nil
}
return &csi.ControllerExpandVolumeResponse{}, nil
}
fs, err := s.Arrays()[arrayID].Client.GetFS(ctx, id)

fs, err := client.GetFS(ctx, id)
if err == nil {
if fs.SizeTotal < requiredBytes {
_, err = s.Arrays()[arrayID].Client.ModifyFS(context.Background(), &gopowerstore.FSModify{Size: int(requiredBytes + ReservedSize)}, id)
_, err = client.ModifyFS(context.Background(), &gopowerstore.FSModify{Size: int(requiredBytes + ReservedSize)}, id)
if err != nil {
return nil, err
}
Expand Down
Loading