Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: remove grpc and csi-addons import from rbd/replication.go #3884

Merged
merged 1 commit into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,12 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
case librbd.MirrorImageDisabling:
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", volumeID)
case librbd.MirrorImageEnabled:
return corerbd.DisableVolumeReplication(rbdVol, mirroringInfo, force)
err = rbdVol.DisableVolumeReplication(mirroringInfo, force)
if err != nil {
return nil, getGRPCError(err)
}

return &replication.DisableVolumeReplicationResponse{}, nil
default:
return nil, status.Errorf(codes.InvalidArgument, "image is in %s Mode", mirroringInfo.State)
}
Expand Down Expand Up @@ -627,9 +632,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,

err = rbdVol.ResyncVol(localStatus, req.Force)
if err != nil {
log.ErrorLog(ctx, err.Error())

return nil, err
return nil, getGRPCError(err)
}

err = checkVolumeResyncStatus(localStatus)
Expand All @@ -649,6 +652,32 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return resp, nil
}

func getGRPCError(err error) error {
if err == nil {
return status.Error(codes.OK, codes.OK.String())
}

errorStatusMap := map[error]codes.Code{
corerbd.ErrFetchingLocalState: codes.Internal,
corerbd.ErrResyncImageFailed: codes.Internal,
corerbd.ErrDisableImageMirroringFailed: codes.Internal,
corerbd.ErrFetchingMirroringInfo: codes.Internal,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
}

for e, code := range errorStatusMap {
if errors.Is(err, e) {
return status.Error(code, err.Error())
}
}

// Handle any other non nil error not listed in the map
return status.Error(codes.Unknown, err.Error())
nixpanic marked this conversation as resolved.
Show resolved Hide resolved
}

// GetVolumeReplicationInfo extracts the RBD volume information from the volumeID, If the
// image is present, mirroring is enabled and the image is in primary state.
func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
Expand Down
73 changes: 73 additions & 0 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rbd

import (
"context"
"errors"
"reflect"
"strings"
"testing"
Expand All @@ -27,6 +28,9 @@ import (

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -494,3 +498,72 @@ func TestValidateLastSyncTime(t *testing.T) {
})
}
}

func TestGetGRPCError(t *testing.T) {
t.Parallel()
tests := []struct {
name string
err error
expectedErr error
}{
{
Rakshith-R marked this conversation as resolved.
Show resolved Hide resolved
name: "FetchingLocalStateFailed",
err: corerbd.ErrFetchingLocalState,
expectedErr: status.Error(codes.Internal, corerbd.ErrFetchingLocalState.Error()),
},
{
name: "ResyncImageFailed",
err: corerbd.ErrResyncImageFailed,
expectedErr: status.Error(codes.Internal, corerbd.ErrResyncImageFailed.Error()),
},
{
name: "DisableImageMirroringFailed",
err: corerbd.ErrDisableImageMirroringFailed,
expectedErr: status.Error(codes.Internal, corerbd.ErrDisableImageMirroringFailed.Error()),
},
{
name: "FetchingMirroringInfoFailed",
err: corerbd.ErrFetchingMirroringInfo,
expectedErr: status.Error(codes.Internal, corerbd.ErrFetchingMirroringInfo.Error()),
},
{
name: "InvalidArgument",
err: corerbd.ErrInvalidArgument,
expectedErr: status.Error(codes.InvalidArgument, corerbd.ErrInvalidArgument.Error()),
},
{
name: "Aborted",
err: corerbd.ErrAborted,
expectedErr: status.Error(codes.Aborted, corerbd.ErrAborted.Error()),
},
{
name: "FailedPrecondition",
err: corerbd.ErrFailedPrecondition,
expectedErr: status.Error(codes.FailedPrecondition, corerbd.ErrFailedPrecondition.Error()),
},
{
name: "Unavailable",
err: corerbd.ErrUnavailable,
expectedErr: status.Error(codes.Unavailable, corerbd.ErrUnavailable.Error()),
},
{
name: "InvalidError",
err: errors.New("some error"),
expectedErr: status.Error(codes.Unknown, "some error"),
},
{
name: "NilError",
err: nil,
expectedErr: status.Error(codes.OK, "ok string"),
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
result := getGRPCError(tt.err)
assert.Equal(t, tt.expectedErr, result)
})
}
}
18 changes: 18 additions & 0 deletions internal/rbd/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,22 @@ var (
// ErrLastSyncTimeNotFound is returned when last sync time is not found for
// the image.
ErrLastSyncTimeNotFound = errors.New("last sync time not found")
// ErrFailedPrecondition is returned when operation is rejected because the system is not in a state
// required for the operation's execution.
ErrFailedPrecondition = errors.New("system is not in a state required for the operation's execution")
// ErrUnavailable is returned when the image needs to be recreated
// locally and may be corrected by retrying with a backoff.
ErrUnavailable = errors.New("image needs to be recreated")
// ErrAborted is returned when the operation is aborted.
ErrAborted = errors.New("operation got aborted")
// ErrInvalidArgument is returned when the client specified an invalid argument.
ErrInvalidArgument = errors.New("invalid arguments provided")
// ErrFetchingLocalState is returned when the operation to fetch local state fails.
ErrFetchingLocalState = errors.New("failed to get local state")
// ErrDisableImageMirroringFailed is returned when the operation to disable image mirroring fails.
ErrDisableImageMirroringFailed = errors.New("failed to disable image mirroring")
// ErrFetchingMirroringInfo is returned when the operation to fetch mirroring info of image fails.
ErrFetchingMirroringInfo = errors.New("failed to get mirroring info of image")
// ErrResyncImageFailed is returned when the operation to resync the image fails.
ErrResyncImageFailed = errors.New("failed to resync image")
)
41 changes: 18 additions & 23 deletions internal/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,29 @@ package rbd

import (
"context"
"fmt"
"strings"

librbd "github.com/ceph/go-ceph/rbd"
"github.com/csi-addons/spec/lib/go/replication"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func (rv *rbdVolume) ResyncVol(localStatus librbd.SiteMirrorImageStatus, force bool) error {
if resyncRequired(localStatus) {
// If the force option is not set return the error message to retry
// with Force option.
if !force {
return status.Errorf(codes.FailedPrecondition,
"image is in %q state, description (%s). Force resync to recover volume",
localStatus.State, localStatus.Description)
return fmt.Errorf("%w: image is in %q state, description (%s). Force resync to recover volume",
ErrFailedPrecondition, localStatus.State, localStatus.Description)
}
err := rv.resyncImage()
if err != nil {
return status.Error(codes.Internal, err.Error())
return fmt.Errorf("%w: failed to resync image: %w", ErrResyncImageFailed, err)
}

// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return status.Error(codes.Unavailable, "awaiting initial resync due to split brain")
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrUnavailable)
}

return nil
Expand Down Expand Up @@ -85,10 +82,10 @@ func resyncRequired(localStatus librbd.SiteMirrorImageStatus) bool {
return false
}

func DisableVolumeReplication(rbdVol *rbdVolume,
func (rv *rbdVolume) DisableVolumeReplication(
mirroringInfo *librbd.MirrorImageInfo,
force bool,
) (*replication.DisableVolumeReplicationResponse, error) {
) error {
if !mirroringInfo.Primary {
// Return success if the below condition is met
// Local image is secondary
Expand All @@ -102,32 +99,30 @@ func DisableVolumeReplication(rbdVol *rbdVolume,
// disabled the image on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the volume
// replication Kubernetes artifacts after failback operation.
localStatus, rErr := rbdVol.GetLocalState()
localStatus, rErr := rv.GetLocalState()
if rErr != nil {
return nil, status.Error(codes.Internal, rErr.Error())
return fmt.Errorf("%w: %w", ErrFetchingLocalState, rErr)
}
if localStatus.Up && localStatus.State == librbd.MirrorImageStatusStateReplaying {
return &replication.DisableVolumeReplicationResponse{}, nil
return nil
}

return nil, status.Errorf(codes.InvalidArgument,
"secondary image status is up=%t and state=%s",
localStatus.Up,
localStatus.State)
return fmt.Errorf("%w: secondary image status is up=%t and state=%s",
ErrInvalidArgument, localStatus.Up, localStatus.State)
}
err := rbdVol.DisableImageMirroring(force)
err := rv.DisableImageMirroring(force)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return fmt.Errorf("%w: %w", ErrDisableImageMirroringFailed, err)
Rakshith-R marked this conversation as resolved.
Show resolved Hide resolved
}
// the image state can be still disabling once we disable the mirroring
// check the mirroring is disabled or not
mirroringInfo, err = rbdVol.GetImageMirroringInfo()
mirroringInfo, err = rv.GetImageMirroringInfo()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return fmt.Errorf("%w: %w", ErrFetchingMirroringInfo, err)
}
if mirroringInfo.State == librbd.MirrorImageDisabling {
return nil, status.Errorf(codes.Aborted, "%s is in disabling state", rbdVol.VolID)
return fmt.Errorf("%w: %q is in disabling state", ErrAborted, rv.VolID)
}

return &replication.DisableVolumeReplicationResponse{}, nil
return nil
}