Skip to content

Commit

Permalink
Remove CSI driver-initiated pause and resume on metro volumes (#360)
Browse files Browse the repository at this point in the history
* user must manually pause metro session to expand metro volume
  • Loading branch information
lukeatdell authored Oct 21, 2024
1 parent eee3ec6 commit fbdb65e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 163 deletions.
70 changes: 8 additions & 62 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,61 +1340,19 @@ func (s *Service) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReque
}, nil
}

func pauseMetroSession(ctx context.Context, metroSessionID string, arr *array.PowerStoreArray) (paused bool, err error) {
func isPausedMetroSession(ctx context.Context, metroSessionID string, arr *array.PowerStoreArray) (paused bool, err error) {
metroSession, err := arr.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session %s", metroSessionID)
}

// confirm the session is in a state we can pause from.
if metroSession.State != gopowerstore.RsStateOk &&
metroSession.State != gopowerstore.RsStateSynchronizing &&
metroSession.State != gopowerstore.RsStatePaused &&
metroSession.State != gopowerstore.RsStateSystemPaused &&
metroSession.State != gopowerstore.RsStateFractured {
return false, fmt.Errorf("could not pause the metro replication session, %s, because the session is not in expected state to pause", metroSession.ID)
}

if metroSession.State != gopowerstore.RsStatePaused {
log.Debugf("pausing metro replication session, %s", metroSession.ID)

// pause the replication session
_, err := arr.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionPause, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be paused: %s", metroSession.ID, err.Error())
}
} else {
log.Debugf("metro replication session, %s, already paused", metroSession.ID)
}
return true, nil
}

func resumeMetroSession(ctx context.Context, metroSessionID string, array *array.PowerStoreArray) (resumed bool, err error) {
metroSession, err := array.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session: %s", err.Error())
}

// nothing to do if not paused
if metroSession.State == gopowerstore.RsStateOk ||
metroSession.State == gopowerstore.RsStateSynchronizing ||
metroSession.State == gopowerstore.RsStateResuming ||
metroSession.State == gopowerstore.RsStateSwitchingToMetroSync ||
metroSession.State == gopowerstore.RsStateFractured {
log.Debugf("metro replication session, %s, already resumed", metroSession.ID)
return false, nil
}
log.Debugf("checking if metro replication session, %s, is paused", metroSession.ID)

// metro session can only be resumed if it is in 'paused' state
if metroSession.State != gopowerstore.RsStatePaused {
return false, errors.New("the metro session must be in 'paused' state before resuming")
return false, errors.New("metro replication session not in 'paused' state")
}

log.Debugf("resuming metro replication session %s", metroSession.ID)
_, err = array.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionResume, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be resumed: %s", metroSession.ID, err.Error())
}
log.Debugf("metro replication session, %s, is paused", metroSession.ID)

return true, nil
}
Expand Down Expand Up @@ -1423,7 +1381,6 @@ func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.Controlle
return nil, status.Error(codes.NotFound, "detected SCSI protocol but wasn't able to fetch the volume info")
}

volExpanded := false // to return appropriate response based on whether the volume is expanded or not
isMetro := remoteVolumeID != ""
if isMetro && vol.MetroReplicationSessionID == "" {
return nil, status.Errorf(codes.Internal,
Expand All @@ -1433,32 +1390,21 @@ func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.Controlle
if vol.Size < requiredBytes {
if isMetro {
// must pause metro session before modifying the volume
_, err = pauseMetroSession(ctx, vol.MetroReplicationSessionID, array)
_, err = isPausedMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be paused: %s", vol.Name, err.Error())
"failed to expand the volume, %s, because the metro replication session is not paused. please pause the metro replication session: %s",
vol.Name, err.Error())
}
}

_, err = client.ModifyVolume(context.Background(), &gopowerstore.VolumeModify{Size: requiredBytes}, id)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to modify volume size: %s", err.Error())
}
volExpanded = true
}

// check the metro session state and resume if necessary
// in case the previous request failed after expanding the volume, resume the session
if isMetro {
volExpanded, err = resumeMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be resumed: %s", vol.Name, err.Error())
}
}
if volExpanded {
return &csi.ControllerExpandVolumeResponse{CapacityBytes: requiredBytes, NodeExpansionRequired: true}, nil
}

return &csi.ControllerExpandVolumeResponse{}, nil
}

Expand Down
104 changes: 3 additions & 101 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2292,20 +2292,11 @@ var _ = ginkgo.Describe("CSIControllerService", func() {
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
// Return paused to resume session
// Return metro session status as paused
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStatePaused,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionResume, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
res, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)
Expand Down Expand Up @@ -2381,111 +2372,22 @@ var _ = ginkgo.Describe("CSIControllerService", func() {
gomega.Expect(err.Error()).To(gomega.ContainSubstring("could not get metro replication session"))
})

ginkgo.It("should fail when trying to pause metro session due to API error", func() {
e := errors.New("some-api-error")
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
clientMock.On("ModifyVolume",
mock.Anything,
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), e)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("because the metro replication session could not be paused"))
})

ginkgo.It("should fail when trying to resume metro session due to API error", func() {
e := errors.New("some-api-error")
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
clientMock.On("ModifyVolume",
mock.Anything,
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
// Return paused to resume session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStatePaused,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionResume, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), e)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("because the metro replication session could not be resumed"))
})

ginkgo.It("should fail when trying to pause metro session due to unexpected state", func() {
ginkgo.It("should fail if metro session is not paused", func() {
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
// Return error state for pause failure
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateError,
}, nil).Times(1)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("because the session is not in expected state to pause"))
})

ginkgo.It("should fail when trying to resume metro session due to unexpected state", func() {
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
clientMock.On("ModifyVolume",
mock.Anything,
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
// Return error state for resume failure
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateError,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("the metro session must be in 'paused' state before resuming"))
gomega.Expect(err.Error()).To(gomega.ContainSubstring("metro replication session not in 'paused' state"))
})
})

Expand Down

0 comments on commit fbdb65e

Please sign in to comment.