Skip to content

Commit

Permalink
Metro volume expand support (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
aqu-dell authored Oct 7, 2024
1 parent c18f172 commit 6d7a601
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/dell/gofsutil v1.16.1
github.com/dell/goiscsi v1.9.0
github.com/dell/gonvme v1.8.1
github.com/dell/gopowerstore v1.15.2-0.20240924141025-1c719e612669
github.com/dell/gopowerstore v1.15.2-0.20241004144335-14f8fa34731c
github.com/fsnotify/fsnotify v1.7.0
github.com/go-openapi/strfmt v0.23.0
github.com/golang/mock v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ github.com/dell/goiscsi v1.9.0 h1:VvMHbAO4vk80oc/TAbQPYlxysscCqVBW78GyPoUxgik=
github.com/dell/goiscsi v1.9.0/go.mod h1:NI/W/0O1UrMW2zVdMxy4z395Jn0r7utH6RQDFSZiFyQ=
github.com/dell/gonvme v1.8.1 h1:46M5lPqj7+Xjen+qxooRN9cx/+uJG4xtK9TpwduWDgE=
github.com/dell/gonvme v1.8.1/go.mod h1:ajbuF+fswq+ty2tRTG5FN4ecIMJsG7aDu/bkMynTKAs=
github.com/dell/gopowerstore v1.15.2-0.20240924141025-1c719e612669 h1:XktIu9B0VskV/nLyDFtsurvCVedi3czY0ziq52lf5RU=
github.com/dell/gopowerstore v1.15.2-0.20240924141025-1c719e612669/go.mod h1:vyN1JAZ+TO7Px+gNVa61a23/KwlI/Nj/6ttzMOQFyG0=
github.com/dell/gopowerstore v1.15.2-0.20241004144335-14f8fa34731c h1:/zP7pDYTZJePpfrXlqchxP5/q8+qF1ezZaOEp5qYFG0=
github.com/dell/gopowerstore v1.15.2-0.20241004144335-14f8fa34731c/go.mod h1:vyN1JAZ+TO7Px+gNVa61a23/KwlI/Nj/6ttzMOQFyG0=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
Expand Down
160 changes: 127 additions & 33 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,20 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
replicationEnabled := params[s.WithRP(KeyReplicationEnabled)]
var remoteSystemName string
isMetroVolume := false
isMetroVolumeGroup := false

if replicationEnabled == "true" && !useNFS {
if replicationEnabled == "true" {
if useNFS {
return nil, status.Error(codes.InvalidArgument, "replication not supported for NFS")
}

log.Info("Preparing volume replication")

remoteSystemName, ok = params[s.WithRP(KeyReplicationRemoteSystem)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no remote system specified in storage class")
return nil, status.Error(codes.InvalidArgument, "replication enabled but no remote system specified in storage class")
}
repMode := params[s.WithRP(KeyReplicationMode)]
// Default to ASYNC for backward compatibility
if repMode == "" {
repMode = common.AsyncMode
}
Expand All @@ -273,32 +277,32 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
log.Infof("%s replication mode requested", repMode)
vgPrefix, ok := params[s.WithRP(KeyReplicationVGPrefix)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no volume group prefix specified in storage class")
return nil, status.Error(codes.InvalidArgument, "replication enabled but no volume group prefix specified in storage class")
}

rpo, ok := params[s.WithRP(KeyReplicationRPO)]
if !ok {
// If Replication mode is ASYNC and there is no RPO specified, returning an error
if repMode == common.AsyncMode {
return nil, status.Errorf(codes.InvalidArgument, "replication mode is ASYNC but no RPO specified in storage class")
return nil, status.Error(codes.InvalidArgument, "replication mode is ASYNC but no RPO specified in storage class")
}
// If Replication mode is SYNC and there is no RPO, defaulting the value to Zero
rpo = common.Zero
}
rpoEnum := gopowerstore.RPOEnum(rpo)
if err := rpoEnum.IsValid(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid RPO value")
return nil, status.Error(codes.InvalidArgument, "invalid RPO value")
}

// Validating RPO to be non Zero when replication mode is ASYNC
if repMode == common.AsyncMode && rpo == common.Zero {
log.Errorf("RPO value for %s cannot be : %s", repMode, rpo)
return nil, status.Errorf(codes.InvalidArgument, "replication mode ASYNC requires RPO value to be non Zero")
return nil, status.Error(codes.InvalidArgument, "replication mode ASYNC requires RPO value to be non Zero")
}

// Validating RPO to be Zero whe replication mode is SYNC
if repMode == common.SyncMode && rpo != common.Zero {
return nil, status.Errorf(codes.InvalidArgument, "replication mode SYNC requires RPO value to be Zero")
return nil, status.Error(codes.InvalidArgument, "replication mode SYNC requires RPO value to be Zero")
}
namespace := ""
if ignoreNS, ok := params[s.WithRP(KeyReplicationIgnoreNamespaces)]; ok && ignoreNS == "false" {
Expand Down Expand Up @@ -370,11 +374,13 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
}
}

// Pass the VolumeGroup to the creator so it can create the new volume inside the vg
if c, ok := creator.(*SCSICreator); ok {
c.vg = &vg
}
case common.Metro:
// handle Metro mode where metro is configured directly on the volume (or volume group if requested)
// handle Metro mode where metro is configured directly on the volume
// Note: Metro on volume group support is not added
log.Info("Metro replication mode requested")

// Get specified remote system object for its ID
Expand All @@ -383,13 +389,7 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
return nil, status.Errorf(codes.Internal, "can't query remote system by name: %s", err.Error())
}

// TODO If volumeGroup input is specified in SC - Verify VolumeGroup exists, if not create one
// There shouldn't be any protection policy on it with replication rule
// Cannot configure Metro on empty VG. Configure after volume is added
// Check if the above sync/async block can be optimized w.r.t volume group calls
// isMetroVolumeGroup = true

isMetroVolume = true // set to true if volume group is not specified
isMetroVolume = true // set to true
default:
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but invalid replication mode specified in storage class")
}
Expand All @@ -413,7 +413,6 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
}

metroVolumeIDSuffix := ""

if isMetroVolume {
// Configure Metro on volume
volID := volumeResponse.VolumeId
Expand Down Expand Up @@ -444,11 +443,6 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
}
// Build the metro volume handle suffix
metroVolumeIDSuffix = ":" + replicationSession.RemoteResourceID + "/" + remoteSystem.SerialNumber
} else if isMetroVolumeGroup {
// TODO configure Metro on volume group if it is first time
// else pause and resume metro session for adding new volumes
// Session needs to be paused before the new volume can be added (before creator.Create()) and then resumed later here.
log.Warn("Configuring Metro on volume group, not yet implemented.")
}

// Fetch the service tag
Expand Down Expand Up @@ -482,7 +476,7 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
return nil, status.Error(codes.InvalidArgument, "volume ID is required")
}

id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
id, arrayID, protocol, remoteVolumeID, _, err := array.ParseVolumeID(ctx, id, s.DefaultArray(), nil)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
return &csi.DeleteVolumeResponse{}, nil
Expand Down Expand Up @@ -572,7 +566,6 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
return nil, err

} else if protocol == "scsi" {
// query volume groups?
vgs, err := arr.GetClient().GetVolumeGroupsByVolumeID(ctx, id)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); !ok || !apiError.NotFound() {
Expand All @@ -586,8 +579,11 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
// TODO: Maybe adding volumegroup id/name to volume id can help?
_, err := arr.GetClient().RemoveMembersFromVolumeGroup(ctx, &gopowerstore.VolumeGroupMembers{VolumeIDs: []string{id}}, vgs.VolumeGroup[0].ID)
if err != nil {
// TODO: check for idempotency cases
return nil, err
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.VolumeAlreadyRemovedFromVolumeGroup() { // idempotency check
log.Debugf("Volume %s has already been removed from volume group %s", id, vgs.VolumeGroup[0].ID) // continue to delete volume
} else {
return nil, status.Errorf(codes.Internal, "failed to remove volume %s from volume group: %s", id, err.Error())
}
}

// Unassign protection policy
Expand Down Expand Up @@ -623,6 +619,8 @@ func (s *Service) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest
if err != nil {
return nil, status.Errorf(codes.Internal, "failure ending metro session on volume: %s", err.Error())
}
} else if remoteVolumeID != "" {
log.Debugf("Expected metro session for volume %s, but it seems to have been already removed.", id)
}

// Delete volume
Expand Down Expand Up @@ -1351,9 +1349,68 @@ func (s *Service) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReque
}, nil
}

func pauseMetroSession(ctx context.Context, metroSessionID string, arr *array.PowerStoreArray) (paused bool, err error) {
metroSession, err := arr.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session %s", metroSessionID)
}

// confirm the session is in a state we can pause from.
if metroSession.State != gopowerstore.RsStateOk &&
metroSession.State != gopowerstore.RsStateSynchronizing &&
metroSession.State != gopowerstore.RsStatePaused &&
metroSession.State != gopowerstore.RsStateSystemPaused &&
metroSession.State != gopowerstore.RsStateFractured {
return false, fmt.Errorf("could not pause the metro replication session, %s, because the session is not in expected state to pause", metroSession.ID)
}

if metroSession.State != gopowerstore.RsStatePaused {
log.Debugf("pausing metro replication session, %s", metroSession.ID)

// pause the replication session
_, err := arr.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionPause, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be paused: %s", metroSession.ID, err.Error())
}
} else {
log.Debugf("metro replication session, %s, already paused", metroSession.ID)
}
return true, nil
}

func resumeMetroSession(ctx context.Context, metroSessionID string, array *array.PowerStoreArray) (resumed bool, err error) {
metroSession, err := array.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session: %s", err.Error())
}

// nothing to do if not paused
if metroSession.State == gopowerstore.RsStateOk ||
metroSession.State == gopowerstore.RsStateSynchronizing ||
metroSession.State == gopowerstore.RsStateResuming ||
metroSession.State == gopowerstore.RsStateSwitchingToMetroSync ||
metroSession.State == gopowerstore.RsStateFractured {
log.Debugf("metro replication session, %s, already resumed", metroSession.ID)
return false, nil
}

// metro session can only be resumed if it is in 'paused' state
if metroSession.State != gopowerstore.RsStatePaused {
return false, errors.New("the metro session must be in 'paused' state before resuming")
}

log.Debugf("resuming metro replication session %s", metroSession.ID)
_, err = array.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionResume, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be resumed: %s", metroSession.ID, err.Error())
}

return true, nil
}

// ControllerExpandVolume resizes Volume or FileSystem by increasing available volume capacity in the storage array.
func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
id, arrayID, protocol, _, _, err := array.ParseVolumeID(ctx, req.VolumeId, s.DefaultArray(), nil)
id, arrayID, protocol, remoteVolumeID, _, err := array.ParseVolumeID(ctx, req.VolumeId, s.DefaultArray(), nil)
if err != nil {
return nil, status.Errorf(codes.OutOfRange, "unable to parse the volume id")
}
Expand All @@ -1363,24 +1420,61 @@ func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.Controlle
return nil, status.Errorf(codes.OutOfRange, "volume exceeds allowed limit")
}

array, ok := s.Arrays()[arrayID]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "unable to find array with ID %s", arrayID)
}
client := array.Client

if protocol == "scsi" {
vol, err := s.Arrays()[arrayID].Client.GetVolume(ctx, id)
vol, err := client.GetVolume(ctx, id)
if err != nil {
return nil, status.Errorf(codes.OutOfRange, "detected SCSI protocol but wasn't able to fetch the volume info")
return nil, status.Error(codes.NotFound, "detected SCSI protocol but wasn't able to fetch the volume info")
}

volExpanded := false // to return appropriate response based on whether the volume is expanded or not
isMetro := remoteVolumeID != ""
if isMetro && vol.MetroReplicationSessionID == "" {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session ID is empty for metro volume", vol.Name)
}

if vol.Size < requiredBytes {
_, err = s.Arrays()[arrayID].Client.ModifyVolume(context.Background(), &gopowerstore.VolumeModify{Size: requiredBytes}, id)
if isMetro {
// must pause metro session before modifying the volume
_, err = pauseMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be paused: %s", vol.Name, err.Error())
}
}

_, err = client.ModifyVolume(context.Background(), &gopowerstore.VolumeModify{Size: requiredBytes}, id)
if err != nil {
return nil, err
return nil, status.Errorf(codes.Internal, "unable to modify volume size: %s", err.Error())
}
volExpanded = true
}

// check the metro session state and resume if necessary
// in case the previous request failed after expanding the volume, resume the session
if isMetro {
volExpanded, err = resumeMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be resumed: %s", vol.Name, err.Error())
}
}
if volExpanded {
return &csi.ControllerExpandVolumeResponse{CapacityBytes: requiredBytes, NodeExpansionRequired: true}, nil
}
return &csi.ControllerExpandVolumeResponse{}, nil
}
fs, err := s.Arrays()[arrayID].Client.GetFS(ctx, id)

fs, err := client.GetFS(ctx, id)
if err == nil {
if fs.SizeTotal < requiredBytes {
_, err = s.Arrays()[arrayID].Client.ModifyFS(context.Background(), &gopowerstore.FSModify{Size: int(requiredBytes + ReservedSize)}, id)
_, err = client.ModifyFS(context.Background(), &gopowerstore.FSModify{Size: int(requiredBytes + ReservedSize)}, id)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 6d7a601

Please sign in to comment.