Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove CSI driver-initiated pause and resume on metro volumes #360

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 8 additions & 62 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,61 +1340,19 @@ func (s *Service) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsReque
}, nil
}

func pauseMetroSession(ctx context.Context, metroSessionID string, arr *array.PowerStoreArray) (paused bool, err error) {
func isPausedMetroSession(ctx context.Context, metroSessionID string, arr *array.PowerStoreArray) (paused bool, err error) {
metroSession, err := arr.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session %s", metroSessionID)
}

// confirm the session is in a state we can pause from.
if metroSession.State != gopowerstore.RsStateOk &&
metroSession.State != gopowerstore.RsStateSynchronizing &&
metroSession.State != gopowerstore.RsStatePaused &&
metroSession.State != gopowerstore.RsStateSystemPaused &&
metroSession.State != gopowerstore.RsStateFractured {
return false, fmt.Errorf("could not pause the metro replication session, %s, because the session is not in expected state to pause", metroSession.ID)
}

if metroSession.State != gopowerstore.RsStatePaused {
log.Debugf("pausing metro replication session, %s", metroSession.ID)

// pause the replication session
_, err := arr.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionPause, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be paused: %s", metroSession.ID, err.Error())
}
} else {
log.Debugf("metro replication session, %s, already paused", metroSession.ID)
}
return true, nil
}

func resumeMetroSession(ctx context.Context, metroSessionID string, array *array.PowerStoreArray) (resumed bool, err error) {
metroSession, err := array.Client.GetReplicationSessionByID(ctx, metroSessionID)
if err != nil {
return false, fmt.Errorf("could not get metro replication session: %s", err.Error())
}

// nothing to do if not paused
if metroSession.State == gopowerstore.RsStateOk ||
metroSession.State == gopowerstore.RsStateSynchronizing ||
metroSession.State == gopowerstore.RsStateResuming ||
metroSession.State == gopowerstore.RsStateSwitchingToMetroSync ||
metroSession.State == gopowerstore.RsStateFractured {
log.Debugf("metro replication session, %s, already resumed", metroSession.ID)
return false, nil
}
log.Debugf("checking if metro replication session, %s, is paused", metroSession.ID)

// metro session can only be resumed if it is in 'paused' state
if metroSession.State != gopowerstore.RsStatePaused {
return false, errors.New("the metro session must be in 'paused' state before resuming")
return false, errors.New("metro replication session not in 'paused' state")
}

log.Debugf("resuming metro replication session %s", metroSession.ID)
_, err = array.Client.ExecuteActionOnReplicationSession(ctx, metroSession.ID, gopowerstore.RsActionResume, nil)
if err != nil {
return false, fmt.Errorf("metro replication session, %s, could not be resumed: %s", metroSession.ID, err.Error())
}
log.Debugf("metro replication session, %s, is paused", metroSession.ID)

return true, nil
}
Expand Down Expand Up @@ -1423,7 +1381,6 @@ func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.Controlle
return nil, status.Error(codes.NotFound, "detected SCSI protocol but wasn't able to fetch the volume info")
}

volExpanded := false // to return appropriate response based on whether the volume is expanded or not
isMetro := remoteVolumeID != ""
if isMetro && vol.MetroReplicationSessionID == "" {
return nil, status.Errorf(codes.Internal,
Expand All @@ -1433,32 +1390,21 @@ func (s *Service) ControllerExpandVolume(ctx context.Context, req *csi.Controlle
if vol.Size < requiredBytes {
if isMetro {
// must pause metro session before modifying the volume
_, err = pauseMetroSession(ctx, vol.MetroReplicationSessionID, array)
_, err = isPausedMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be paused: %s", vol.Name, err.Error())
"failed to expand the volume, %s, because the metro replication session is not paused. please pause the metro replication session: %s",
vol.Name, err.Error())
}
}

_, err = client.ModifyVolume(context.Background(), &gopowerstore.VolumeModify{Size: requiredBytes}, id)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to modify volume size: %s", err.Error())
}
volExpanded = true
}

// check the metro session state and resume if necessary
// in case the previous request failed after expanding the volume, resume the session
if isMetro {
volExpanded, err = resumeMetroSession(ctx, vol.MetroReplicationSessionID, array)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to expand the volume %s because the metro replication session could not be resumed: %s", vol.Name, err.Error())
}
}
if volExpanded {
return &csi.ControllerExpandVolumeResponse{CapacityBytes: requiredBytes, NodeExpansionRequired: true}, nil
}

return &csi.ControllerExpandVolumeResponse{}, nil
}

Expand Down
104 changes: 3 additions & 101 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2292,20 +2292,11 @@ var _ = ginkgo.Describe("CSIControllerService", func() {
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
// Return paused to resume session
// Return metro session status as paused
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStatePaused,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionResume, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
res, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)
Expand Down Expand Up @@ -2381,111 +2372,22 @@ var _ = ginkgo.Describe("CSIControllerService", func() {
gomega.Expect(err.Error()).To(gomega.ContainSubstring("could not get metro replication session"))
})

ginkgo.It("should fail when trying to pause metro session due to API error", func() {
e := errors.New("some-api-error")
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
clientMock.On("ModifyVolume",
mock.Anything,
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), e)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("because the metro replication session could not be paused"))
})

ginkgo.It("should fail when trying to resume metro session due to API error", func() {
e := errors.New("some-api-error")
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
clientMock.On("ModifyVolume",
mock.Anything,
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
// Return paused to resume session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStatePaused,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionResume, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), e)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("because the metro replication session could not be resumed"))
})

ginkgo.It("should fail when trying to pause metro session due to unexpected state", func() {
ginkgo.It("should fail if metro session is not paused", func() {
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
// Return error state for pause failure
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateError,
}, nil).Times(1)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("because the session is not in expected state to pause"))
})

ginkgo.It("should fail when trying to resume metro session due to unexpected state", func() {
clientMock.On("GetVolume", mock.Anything, validBaseVolID).Return(gopowerstore.Volume{
MetroReplicationSessionID: validSessionID,
Size: validVolSize,
}, nil)
clientMock.On("ModifyVolume",
mock.Anything,
mock.AnythingOfType("*gopowerstore.VolumeModify"),
validBaseVolID).
Return(gopowerstore.EmptyResponse(""), nil)
// Return okay to pause session
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateOk,
}, nil).Times(1)
// Return error state for resume failure
clientMock.On("GetReplicationSessionByID", mock.Anything, validSessionID).Return(gopowerstore.ReplicationSession{
ID: validSessionID,
State: gopowerstore.RsStateError,
}, nil).Times(1)
clientMock.On("ExecuteActionOnReplicationSession", mock.Anything, validSessionID, gopowerstore.RsActionPause, (*gopowerstore.FailoverParams)(nil)).
Return(gopowerstore.EmptyResponse(""), nil)

req := getTypicalControllerExpandRequest(validMetroBlockVolumeID, validVolSize*2)
_, err := ctrlSvc.ControllerExpandVolume(context.Background(), req)

gomega.Expect(err).ToNot(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("the metro session must be in 'paused' state before resuming"))
gomega.Expect(err.Error()).To(gomega.ContainSubstring("metro replication session not in 'paused' state"))
})
})

Expand Down