Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new replication parameter 'mode' to storage class #341

Merged
merged 4 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/controller/base.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright © 2021-2023 Dell Inc. or its subsidiaries. All Rights Reserved.
* Copyright © 2021-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,8 @@ const (
VolumeSizeMultiple = 8192
// MaxVolumeNameLength max length for the volume name
MaxVolumeNameLength = 128
// ReplicationPrefix represents replication prefix
ReplicationPrefix = "replication.storage.dell.com"
// ErrUnknownAccessType represents error message for unknown access type
ErrUnknownAccessType = "unknown access type is not Block or Mount"
// ErrUnknownAccessMode represents error message for unknown access mode
Expand All @@ -54,6 +56,8 @@ const (
KeyFsTypeOld = "FsType"
// KeyReplicationEnabled represents key for replication enabled
KeyReplicationEnabled = "isReplicationEnabled"
// KeyReplicationMode represents key for replication mode
KeyReplicationMode = "mode"
// KeyReplicationRPO represents key for replication RPO
KeyReplicationRPO = "rpo"
// KeyReplicationRemoteSystem represents key for replication remote system
Expand Down
152 changes: 94 additions & 58 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,84 +249,110 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest

// Check if replication is enabled
replicationEnabled := params[s.WithRP(KeyReplicationEnabled)]
isMetroVolume := false
isMetroVolumeGroup := false

if replicationEnabled == "true" && !useNFS {
log.Info("Preparing volume replication")

vgPrefix, ok := params[s.WithRP(KeyReplicationVGPrefix)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no volume group prefix specified in storage class")
}
rpo, ok := params[s.WithRP(KeyReplicationRPO)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no RPO specified in storage class")
}
rpoEnum := gopowerstore.RPOEnum(rpo)
if err := rpoEnum.IsValid(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid rpo value")
}
remoteSystemName, ok := params[s.WithRP(KeyReplicationRemoteSystem)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no remote system specified in storage class")
}
repMode := params[s.WithRP(KeyReplicationMode)]
if repMode == "" {
repMode = "ASYNC"
}
repMode = strings.ToUpper(repMode)

namespace := ""
if ignoreNS, ok := params[s.WithRP(KeyReplicationIgnoreNamespaces)]; ok && ignoreNS == "false" {
pvcNS, ok := params[KeyCSIPVCNamespace]
if ok {
namespace = pvcNS + "-"
switch repMode {
case "SYNC", "ASYNC":
// handle Sync and Async modes
santhoshatdell marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("%s replication mode requested", repMode)
vgPrefix, ok := params[s.WithRP(KeyReplicationVGPrefix)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we always using volume groups?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I made a comment in the Metro section to optimize the volume group calls as we add the metro calls in later PRs.

if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no volume group prefix specified in storage class")
}
rpo, ok := params[s.WithRP(KeyReplicationRPO)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but no RPO specified in storage class")
}
rpoEnum := gopowerstore.RPOEnum(rpo)
if err := rpoEnum.IsValid(); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid RPO value")
}
}

vgName := vgPrefix + "-" + namespace + remoteSystemName + "-" + rpo
if len(vgName) > 128 {
vgName = vgName[:128]
}
namespace := ""
if ignoreNS, ok := params[s.WithRP(KeyReplicationIgnoreNamespaces)]; ok && ignoreNS == "false" {
pvcNS, ok := params[KeyCSIPVCNamespace]
if ok {
namespace = pvcNS + "-"
}
}

vg, err = arr.Client.GetVolumeGroupByName(ctx, vgName)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
log.Infof("Volume group with name %s not found, creating it", vgName)
vgName := vgPrefix + "-" + namespace + remoteSystemName + "-" + rpo
if len(vgName) > 128 {
vgName = vgName[:128]
}

// ensure protection policy exists
pp, err := EnsureProtectionPolicyExists(ctx, arr, vgName, remoteSystemName, rpoEnum)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't ensure protection policy exists %s", err.Error())
}
vg, err = arr.Client.GetVolumeGroupByName(ctx, vgName)
if err != nil {
if apiError, ok := err.(gopowerstore.APIError); ok && apiError.NotFound() {
log.Infof("Volume group with name %s not found, creating it", vgName)

group, err := arr.Client.CreateVolumeGroup(ctx, &gopowerstore.VolumeGroupCreate{
Name: vgName,
ProtectionPolicyID: pp,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "can't create volume group: %s", err.Error())
}
// ensure protection policy exists
pp, err := EnsureProtectionPolicyExists(ctx, arr, vgName, remoteSystemName, rpoEnum)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't ensure protection policy exists %s", err.Error())
}

vg, err = arr.Client.GetVolumeGroup(ctx, group.ID)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't query volume group by id %s : %s", group.ID, err.Error())
}
group, err := arr.Client.CreateVolumeGroup(ctx, &gopowerstore.VolumeGroupCreate{
Name: vgName,
ProtectionPolicyID: pp,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "can't create volume group: %s", err.Error())
}

} else {
return nil, status.Errorf(codes.Internal, "can't query volume group by name %s : %s", vgName, err.Error())
}
} else {
// group exists, check that protection policy applied
if vg.ProtectionPolicyID == "" {
pp, err := EnsureProtectionPolicyExists(ctx, arr, vgName, remoteSystemName, rpoEnum)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't ensure protection policy exists %s", err.Error())
vg, err = arr.Client.GetVolumeGroup(ctx, group.ID)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't query volume group by id %s : %s", group.ID, err.Error())
}

} else {
return nil, status.Errorf(codes.Internal, "can't query volume group by name %s : %s", vgName, err.Error())
}
policyUpdate := gopowerstore.VolumeGroupChangePolicy{ProtectionPolicyID: pp}
_, err = arr.Client.UpdateVolumeGroupProtectionPolicy(ctx, vg.ID, &policyUpdate)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't update volume group policy %s", err.Error())
} else {
// group exists, check that protection policy applied
if vg.ProtectionPolicyID == "" {
pp, err := EnsureProtectionPolicyExists(ctx, arr, vgName, remoteSystemName, rpoEnum)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't ensure protection policy exists %s", err.Error())
}
policyUpdate := gopowerstore.VolumeGroupChangePolicy{ProtectionPolicyID: pp}
_, err = arr.Client.UpdateVolumeGroupProtectionPolicy(ctx, vg.ID, &policyUpdate)
if err != nil {
return nil, status.Errorf(codes.Internal, "can't update volume group policy %s", err.Error())
}
}
}
}

if c, ok := creator.(*SCSICreator); ok {
c.vg = &vg
if c, ok := creator.(*SCSICreator); ok {
c.vg = &vg
}
case "METRO":
// handle Metro mode
log.Info("Metro replication mode requested")

// TODO If volumeGroup input is specified in SC - Verify VolumeGroup exists, if not create one
// There shouldn't be any protection policy on it with replication rule
// Cannot configure Metro on empty VG. Configure after volume is added
// Check if the above sync/async block can be optimized w.r.t volume group calls
// isMetroVolumeGroup = true

// isMetroVolume = true // set to true if volume group is not specified
default:
return nil, status.Errorf(codes.InvalidArgument, "replication enabled but invalid replication mode specified in storage class")
}
}

Expand All @@ -347,6 +373,16 @@ func (s *Service) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest
volumeResponse = getCSIVolume(resp.ID, sizeInBytes)
}

if isMetroVolume {
// TODO configure Metro on volume
log.Warn("Configuring Metro on volume, not yet implemented.")
} else if isMetroVolumeGroup {
// TODO configure Metro on volume group if it is first time
// else pause and resume metro session for adding new volumes
// Session needs to be paused before the new volume can be added (before creator.Create()) and then resumed later here.
log.Warn("Configuring Metro on volume group, not yet implemented.")
}

// Fetch the service tag
serviceTag := GetServiceTag(ctx, req, arr, resp.ID, protocol)

Expand Down
13 changes: 12 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ var _ = ginkgo.Describe("CSIControllerService", func() {
res, err := ctrlSvc.CreateVolume(context.Background(), req)
gomega.Expect(res).To(gomega.BeNil())
gomega.Expect(err).NotTo(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("invalid rpo value"))
gomega.Expect(err.Error()).To(gomega.ContainSubstring("invalid RPO value"))
})

ginkgo.It("should fail when rpo not declared in parameters", func() {
Expand All @@ -457,6 +457,17 @@ var _ = ginkgo.Describe("CSIControllerService", func() {
gomega.Expect(err.Error()).To(gomega.ContainSubstring("replication enabled but no remote system specified in storage class"))
})

ginkgo.It("should fail when mode is incorrect", func() {
clientMock.On("CreateVolume", mock.Anything, mock.Anything).Return(gopowerstore.CreateResponse{ID: validBaseVolID}, nil)

req.Parameters[ctrlSvc.WithRP(controller.KeyReplicationMode)] = "SYNCMETRO"

res, err := ctrlSvc.CreateVolume(context.Background(), req)
gomega.Expect(res).To(gomega.BeNil())
gomega.Expect(err).NotTo(gomega.BeNil())
gomega.Expect(err.Error()).To(gomega.ContainSubstring("invalid replication mode"))
})

ginkgo.It("should fail when volume group prefix not declared in parameters", func() {
delete(req.Parameters, ctrlSvc.WithRP(controller.KeyReplicationVGPrefix))

Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,12 @@ func (s *Service) GetStorageProtectionGroupStatus(ctx context.Context,

// WithRP appends Replication Prefix to provided string
func (s *Service) WithRP(key string) string {
return s.replicationPrefix + "/" + key
replicationPrefix := s.replicationPrefix
if replicationPrefix == "" {
replicationPrefix = ReplicationPrefix
}

return replicationPrefix + "/" + key
}

func getRemoteCSIVolume(volumeID string, size int64) *csiext.Volume {
Expand Down
15 changes: 12 additions & 3 deletions samples/storageclass/powerstore-replication.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#
#
# Copyright © 2021-2022 Dell Inc. or its subsidiaries. All Rights Reserved.
# Copyright © 2021-2024 Dell Inc. or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -27,12 +27,20 @@ parameters:

# replication.storage.dell.com/isReplicationEnabled:
# Allowed values:
# true: enable replication sidecar
# false: disable replication sidecar
# true: replication is enabled
# false: replication is disabled
# Optional: true
# Default value: false
replication.storage.dell.com/isReplicationEnabled: "true"

# replication.storage.dell.com/mode: replication mode
# Allowed values:
# "ASYNC" - Asynchronous mode
# "METRO" - Metro mode
# Optional: true
# Default value: "ASYNC"
replication.storage.dell.com/mode: "ASYNC"

# replication.storage.dell.com/remoteStorageClassName:
# Allowed values: string
# Optional: true
Expand Down Expand Up @@ -76,6 +84,7 @@ parameters:
# Optional: false
# Default value: None
arrayID: "Unique"

# FsType: file system type for mounted volumes
# Allowed values:
# ext3: ext3 filesystem type
Expand Down