Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pendingsnapshots): add pending snapshots on CVR by talking to peer replicas #1641

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions cmd/cstor-pool-mgmt/controller/replica-controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,16 @@ func (c *CStorVolumeReplicaController) syncHandler(
// Synchronize cstor volume total allocated and
// used capacity fields on CVR object.
// Any kind of sync activity should be done from here.
c.syncCvr(cvrGot)
err = c.syncCVRStatus(cvrGot)
if err != nil {
c.recorder.Event(
cvrGot,
corev1.EventTypeWarning,
"SyncFailed",
fmt.Sprintf("failed to sync CVR error: %s", err.Error()),
)
return nil
}

_, err = c.clientset.
OpenebsV1alpha1().
Expand Down Expand Up @@ -624,43 +633,28 @@ func (c *CStorVolumeReplicaController) getCVRStatus(
return replicaStatus, nil
}

// syncCvr updates field on CVR object after fetching the values from zfs utility.
func (c *CStorVolumeReplicaController) syncCvr(cvr *apis.CStorVolumeReplica) {
// syncCVRStatus updates field on CVR status after fetching the values from zfs utility.
func (c *CStorVolumeReplicaController) syncCVRStatus(cvr *apis.CStorVolumeReplica) error {
// Get the zfs volume name corresponding to this cvr.
volumeName, err := volumereplica.GetVolumeName(cvr)
if err != nil {
klog.Errorf("Unable to sync CVR capacity: %v", err)
c.recorder.Event(
cvr,
corev1.EventTypeWarning,
string(common.FailureCapacitySync),
string(common.MessageResourceFailCapacitySync),
)
return err
}
// Get capacity of the volume.
capacity, err := volumereplica.Capacity(volumeName)
if err != nil {
klog.Errorf("Unable to sync CVR capacity: %v", err)
c.recorder.Event(
cvr,
corev1.EventTypeWarning,
string(common.FailureCapacitySync),
string(common.MessageResourceFailCapacitySync),
)
return errors.Wrapf(err, "failed to get volume replica capacity")
} else {
cvr.Status.Capacity = *capacity
}

if os.Getenv(string(common.RebuildEstimates)) == "true" {
err = volumereplica.GetAndUpdateSnapshotInfo(c.clientset, cvr)
if err != nil {
c.recorder.Event(
cvr,
corev1.EventTypeWarning,
"SnapshotList",
fmt.Sprintf("Unable to update snapshot list ddetails in cvr status err: %v", err),
)
return errors.Wrapf(err, "Unable to update snapshot list details in CVR")
}
}
return nil
}

func (c *CStorVolumeReplicaController) reconcileVersion(cvr *apis.CStorVolumeReplica) (
Expand Down
132 changes: 117 additions & 15 deletions cmd/cstor-pool-mgmt/volumereplica/volumereplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/openebs/maya/pkg/util"
zfs "github.com/openebs/maya/pkg/zfs/cmd/v1alpha1"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)

Expand Down Expand Up @@ -699,24 +700,125 @@ func GetAndUpdateSnapshotInfo(
}

// If CVR is in rebuilding go and get snapshot information
// from other replicas
//if cvr.Status.Phase == apis.CVRStatusRebuilding {
// err = getAndUpdatePendingSnapshotsList(clientset, cvr)
//}
// from other replicas and add snapshots under pending snapshot list
if cvr.Status.Phase == apis.CVRStatusRebuilding ||
mynktl marked this conversation as resolved.
Show resolved Hide resolved
cvr.Status.Phase == apis.CVRStatusReconstructingNewReplica {
err = getAndAddPendingSnapshotList(clientset, cvr)
if err != nil {
return errors.Wrapf(err, "failed to update pending snapshots")
}
}

// It looks like hack but we must do this because of below reason
// - There might be chances in nth reconciliation CVR might be in Rebuilding
// and pending snapshots added under CVR.Status.PendingSnapshots and after that
// let us assume this pool is down meanwhile if snapshot deletion request
// came and deleted snapshots in peer replicas. In next reconciliation if
// CVR is Healthy then there might be chances that pending snapshots remains
// as is to cover this corner case below check is required.
if cvr.Status.Phase == apis.CVRStatusOnline &&
mynktl marked this conversation as resolved.
Show resolved Hide resolved
len(cvr.Status.PendingSnapshots) != 0 {
klog.Infof("CVR: %s is marked as %s hence removing pending snapshots %v",
cvr.Name,
cvr.Status.Phase,
getSnapshotNames(cvr.Status.PendingSnapshots),
)
cvr.Status.PendingSnapshots = nil
}
return nil
}

// getAndAddPendingSnapshotList get the snapshot information from peer replicas and
// add under pending snapshot list
// NOTE: Below function will delete the snapshot under pending snapshots if doesn't exists
// on peer replicas
func getAndAddPendingSnapshotList(
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
clientset clientset.Interface, cvr *apis.CStorVolumeReplica) error {
newSnapshots := []string{}
removedSnapshots := []string{}

peerCVRList, err := getPeerReplicas(clientset, cvr)
if err != nil {
return errors.Wrapf(err, "failed to get peer CVRs of volume replica %s", cvr.Name)
}

peerSnapshotList := getPeerSnapshotInfoList(peerCVRList)
if cvr.Status.PendingSnapshots == nil {
cvr.Status.PendingSnapshots = map[string]apis.CStorSnapshotInfo{}
}

// Delete the pending snapshot that doesn't exist in peer snapshot list
for snapName, _ := range cvr.Status.PendingSnapshots {
if _, ok := peerSnapshotList[snapName]; !ok {
delete(cvr.Status.PendingSnapshots, snapName)
removedSnapshots = append(removedSnapshots, snapName)
}
}

// Add peer snapshots if doesn't exist on Snapshots and PendingSnapshots
// of current CVR
for snapName, snapInfo := range peerSnapshotList {
if _, ok := cvr.Status.Snapshots[snapName]; !ok {
if _, ok := cvr.Status.PendingSnapshots[snapName]; !ok {
cvr.Status.PendingSnapshots[snapName] = snapInfo
newSnapshots = append(newSnapshots, snapName)
}
}
}

klog.Infof(
"Adding %v pending snapshots and deleting %v pending snapshots on CVR %s",
newSnapshots,
removedSnapshots,
cvr.Name)
return nil
}

//func getAndUpdatePendingSnapshotsList(
// clientset clientset.Interface, cvr *apis.CStorVolumeReplica) error {
// peerCVRList, err := getPeerReplicas(clientset, cvr)
// if err != nil {
// return errors.Wrapf(err, "failed to get peer CVRs info of volume %s", volName)
// }
// TODO: Perform following steps
// 1. Get the snapshot info from peer replicas
// 2. If snapshot info doesn't exist under CVR.Status.Snapshots then
// add it under CVR.Status.PendingSnapshots
//}
// getPeerReplicas returns list of peer replicas of volume
func getPeerReplicas(
clientset clientset.Interface,
cvr *apis.CStorVolumeReplica) (*apis.CStorVolumeReplicaList, error) {
volName := cvr.GetLabels()[string(apis.PersistentVolumeCPK)]
peerCVRList := &apis.CStorVolumeReplicaList{}
cvrList, err := clientset.OpenebsV1alpha1().
CStorVolumeReplicas(cvr.Namespace).
List(metav1.ListOptions{LabelSelector: string(apis.PersistentVolumeCPK) + "=" + volName})
if err != nil {
return nil, err
}
for _, obj := range cvrList.Items {
if obj.Name != cvr.Name {
peerCVRList.Items = append(peerCVRList.Items, obj)
}
}
return peerCVRList, nil
}

// getPeerSnapshotInfoList returns the map of snapshot name and snapshot info
// If any healthy replica exist in peer replica it will return Status.Snapshots
// else iterate over all the degraded replicas and get snapshot list
func getPeerSnapshotInfoList(
peerCVRList *apis.CStorVolumeReplicaList) map[string]apis.CStorSnapshotInfo {

snapshotInfoList := map[string]apis.CStorSnapshotInfo{}
for _, cvrObj := range peerCVRList.Items {
for snapName, snapInfo := range cvrObj.Status.Snapshots {
if _, ok := snapshotInfoList[snapName]; !ok {
snapshotInfoList[snapName] = snapInfo
}
}
}
return snapshotInfoList
}

// getSnapshotNames returns snapshot names from map of snapshot and snapshot info
func getSnapshotNames(snapMap map[string]apis.CStorSnapshotInfo) []string {
snapNameList := make([]string, len(snapMap))
for snapName, _ := range snapMap {
snapNameList = append(snapNameList, snapName)
}
return snapNameList
}

// addOrDeleteSnapshotListInfo adds/deletes the snapshots in CVR
// It performs below steps:
Expand Down