Skip to content

Commit

Permalink
rbd: migration of replication service to csi-addon
Browse files Browse the repository at this point in the history
this commit removes grpc import from replication.go
and replaced it with usual errors and passed gRPC
responses in csi-addons

Signed-off-by: riya-singhal31 <rsinghal@redhat.com>
  • Loading branch information
riya-singhal31 authored and mergify[bot] committed Jun 22, 2023
1 parent 66785c3 commit cdaa926
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 27 deletions.
37 changes: 33 additions & 4 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,12 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabling:
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled:
return corerbd.DisableVolumeReplication(rbdVol, mirroringInfo, force)
err = rbdVol.DisableVolumeReplication(mirroringInfo, force)
if err != nil {
return nil, getGRPCError(err)
}

return &replication.DisableVolumeReplicationResponse{}, nil
default:
return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State)
}
Expand Down Expand Up @@ -627,9 +632,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,

err = rbdVol.ResyncVol(localStatus, req.Force)
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, err
return nil, getGRPCError(err)
}

err = checkVolumeResyncStatus(localStatus)
Expand All @@ -649,6 +652,32 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return resp, nil
}

func getGRPCError(err error) error {
if err == nil {
return status.Error(codes.OK, codes.OK.String())
}

errorStatusMap := map[error]codes.Code{
corerbd.ErrFetchingLocalState: codes.Internal,
corerbd.ErrResyncImageFailed: codes.Internal,
corerbd.ErrDisableImageMirroringFailed: codes.Internal,
corerbd.ErrFetchingMirroringInfo: codes.Internal,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
}

for e, code := range errorStatusMap {
if errors.Is(err, e) {
return status.Error(code, err.Error())
}
}

// Handle any other non nil error not listed in the map
return status.Error(codes.Unknown, err.Error())
}

// GetVolumeReplicationInfo extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in primary state.
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
Expand Down
73 changes: 73 additions & 0 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rbd

import (
"context"
"errors"
"reflect"
"strings"
"testing"
Expand All @@ -27,6 +28,9 @@ import (

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -494,3 +498,72 @@ func TestValidateLastSyncTime(t *testing.T) {
})
}
}

func TestGetGRPCError(t *testing.T) {
t.Parallel()
tests := []struct {
name string
err error
expectedErr error
}{
{
name: "FetchingLocalStateFailed",
err: corerbd.ErrFetchingLocalState,
expectedErr: status.Error(codes.Internal, corerbd.ErrFetchingLocalState.Error()),
},
{
name: "ResyncImageFailed",
err: corerbd.ErrResyncImageFailed,
expectedErr: status.Error(codes.Internal, corerbd.ErrResyncImageFailed.Error()),
},
{
name: "DisableImageMirroringFailed",
err: corerbd.ErrDisableImageMirroringFailed,
expectedErr: status.Error(codes.Internal, corerbd.ErrDisableImageMirroringFailed.Error()),
},
{
name: "FetchingMirroringInfoFailed",
err: corerbd.ErrFetchingMirroringInfo,
expectedErr: status.Error(codes.Internal, corerbd.ErrFetchingMirroringInfo.Error()),
},
{
name: "InvalidArgument",
err: corerbd.ErrInvalidArgument,
expectedErr: status.Error(codes.InvalidArgument, corerbd.ErrInvalidArgument.Error()),
},
{
name: "Aborted",
err: corerbd.ErrAborted,
expectedErr: status.Error(codes.Aborted, corerbd.ErrAborted.Error()),
},
{
name: "FailedPrecondition",
err: corerbd.ErrFailedPrecondition,
expectedErr: status.Error(codes.FailedPrecondition, corerbd.ErrFailedPrecondition.Error()),
},
{
name: "Unavailable",
err: corerbd.ErrUnavailable,
expectedErr: status.Error(codes.Unavailable, corerbd.ErrUnavailable.Error()),
},
{
name: "InvalidError",
err: errors.New("some error"),
expectedErr: status.Error(codes.Unknown, "some error"),
},
{
name: "NilError",
err: nil,
expectedErr: status.Error(codes.OK, "ok string"),
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
result := getGRPCError(tt.err)
assert.Equal(t, tt.expectedErr, result)
})
}
}
18 changes: 18 additions & 0 deletions internal/rbd/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,22 @@ var (
// ErrLastSyncTimeNotFound is returned when last sync time is not found for
// the image.
ErrLastSyncTimeNotFound = errors.New("last sync time not found")
// ErrFailedPrecondition is returned when operation is rejected because the system is not in a state
// required for the operation's execution.
ErrFailedPrecondition = errors.New("system is not in a state required for the operation's execution")
// ErrUnavailable is returned when the image needs to be recreated
// locally and may be corrected by retrying with a backoff.
ErrUnavailable = errors.New("image needs to be recreated")
// ErrAborted is returned when the operation is aborted.
ErrAborted = errors.New("operation got aborted")
// ErrInvalidArgument is returned when the client specified an invalid argument.
ErrInvalidArgument = errors.New("invalid arguments provided")
// ErrFetchingLocalState is returned when the operation to fetch local state fails.
ErrFetchingLocalState = errors.New("failed to get local state")
// ErrDisableImageMirroringFailed is returned when the operation to disable image mirroring fails.
ErrDisableImageMirroringFailed = errors.New("failed to disable image mirroring")
// ErrFetchingMirroringInfo is returned when the operation to fetch mirroring info of image fails.
ErrFetchingMirroringInfo = errors.New("failed to get mirroring info of image")
// ErrResyncImageFailed is returned when the operation to resync the image fails.
ErrResyncImageFailed = errors.New("failed to resync image")
)
41 changes: 18 additions & 23 deletions internal/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,29 @@ package rbd

import (
"context"
"fmt"
"strings"

librbd "github.com/ceph/go-ceph/rbd"
"github.com/csi-addons/spec/lib/go/replication"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus, force bool) error {
if resyncRequired(localStatus) {
// If the force option is not set return the error message to retry
// with Force option.
if !force {
return status.Errorf(codes.FailedPrecondition,
"image is in %q state, description (%s). Force resync to recover volume",
localStatus.State, localStatus.Description)
return fmt.Errorf("%w: image is in %q state, description (%s). Force resync to recover volume",
ErrFailedPrecondition, localStatus.State, localStatus.Description)
}
err := rv.resyncImage()
if err != nil {
return status.Error(codes.Internal, err.Error())
return fmt.Errorf("%w: failed to resync image: %w", ErrResyncImageFailed, err)
}

// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return status.Error(codes.Unavailable, "awaiting initial resync due to split brain")
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
}

return nil
Expand Down Expand Up @@ -85,10 +82,10 @@ func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool {
return false
}

func DisableVolumeReplication(rbdVol *rbdVolume,
func (rv *rbdVolume) DisableVolumeReplication(
mirroringInfo *librbd.MirrorImageInfo,
force bool,
) (*replication.DisableVolumeReplicationResponse, error) {
) error {
if !mirroringInfo.Primary {
// Return success if the below condition is met
// Local image is secondary
Expand All @@ -102,32 +99,30 @@ func DisableVolumeReplication(rbdVol *rbdVolume,
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
localStatus, rErr := rbdVol.GetLocalState()
localStatus, rErr := rv.GetLocalState()
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
return fmt.Errorf("%w: %w", ErrFetchingLocalState, rErr)
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
return &replication.DisableVolumeReplicationResponse{}, nil
return nil
}

return nil, status.Errorf(codes.InvalidArgument,
"secondary image status is up=%t and state=%s",
localStatus.Up,
localStatus.State)
return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.Up, localStatus.State)
}
err := rbdVol.DisableImageMirroring(force)
err := rv.DisableImageMirroring(force)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return fmt.Errorf("%w: %w", ErrDisableImageMirroringFailed, err)
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
mirroringInfo, err = rbdVol.GetImageMirroringInfo()
mirroringInfo, err = rv.GetImageMirroringInfo()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return fmt.Errorf("%w: %w", ErrFetchingMirroringInfo, err)
}
if mirroringInfo.State == librbd.MirrorImageDisabling {
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID)
return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID)
}

return &replication.DisableVolumeReplicationResponse{}, nil
return nil
}

0 comments on commit cdaa926

Please sign in to comment.