Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pendingsnapshots): add pending snapshots on CVR by talking to peer replicas #1641

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions cmd/cstor-pool-mgmt/controller/replica-controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,16 @@ func (c *CStorVolumeReplicaController) syncHandler(
// Synchronize cstor volume total allocated and
// used capacity fields on CVR object.
// Any kind of sync activity should be done from here.
c.syncCvr(cvrGot)
err = c.syncCvr(cvrGot)
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
c.recorder.Event(
cvrGot,
corev1.EventTypeWarning,
"SyncFailed",
fmt.Sprintf("failed to sync CVR error: %s", err.Error()),
)
return nil
}

_, err = c.clientset.
OpenebsV1alpha1().
Expand Down Expand Up @@ -625,42 +634,27 @@ func (c *CStorVolumeReplicaController) getCVRStatus(
}

// syncCvr updates field on CVR object after fetching the values from zfs utility.
func (c *CStorVolumeReplicaController) syncCvr(cvr *apis.CStorVolumeReplica) {
func (c *CStorVolumeReplicaController) syncCvr(cvr *apis.CStorVolumeReplica) error {
// Get the zfs volume name corresponding to this cvr.
volumeName, err := volumereplica.GetVolumeName(cvr)
if err != nil {
klog.Errorf("Unable to sync CVR capacity: %v", err)
c.recorder.Event(
cvr,
corev1.EventTypeWarning,
string(common.FailureCapacitySync),
string(common.MessageResourceFailCapacitySync),
)
return err
}
// Get capacity of the volume.
capacity, err := volumereplica.Capacity(volumeName)
if err != nil {
klog.Errorf("Unable to sync CVR capacity: %v", err)
c.recorder.Event(
cvr,
corev1.EventTypeWarning,
string(common.FailureCapacitySync),
string(common.MessageResourceFailCapacitySync),
)
return errors.Wrapf(err, "failed to get volume replica capacity")
} else {
cvr.Status.Capacity = *capacity
}

if os.Getenv(string(common.RebuildEstimates)) == "true" {
err = volumereplica.GetAndUpdateSnapshotInfo(c.clientset, cvr)
if err != nil {
c.recorder.Event(
cvr,
corev1.EventTypeWarning,
"SnapshotList",
fmt.Sprintf("Unable to update snapshot list ddetails in cvr status err: %v", err),
)
return errors.Wrapf(err, "Unable to update snapshot list details in CVR status err: %v", err)
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
}

func (c *CStorVolumeReplicaController) reconcileVersion(cvr *apis.CStorVolumeReplica) (
Expand Down
134 changes: 119 additions & 15 deletions cmd/cstor-pool-mgmt/volumereplica/volumereplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/openebs/maya/pkg/util"
zfs "github.com/openebs/maya/pkg/zfs/cmd/v1alpha1"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)

Expand Down Expand Up @@ -699,24 +700,127 @@ func GetAndUpdateSnapshotInfo(
}

// If CVR is in rebuilding go and get snapshot information
// from other replicas
//if cvr.Status.Phase == apis.CVRStatusRebuilding {
// err = getAndUpdatePendingSnapshotsList(clientset, cvr)
//}
// from other replicas and add snapshots under pending snapshot list
if cvr.Status.Phase == apis.CVRStatusRebuilding ||
mynktl marked this conversation as resolved.
Show resolved Hide resolved
cvr.Status.Phase == apis.CVRStatusReconstructingNewReplica {
err = getAndAddPendingSnapshotList(clientset, cvr)
if err != nil {
return errors.Wrapf(err, "failed to update pending snapshots")
}
}

// It looks like hack but we must do this because of below reason
// - There might be chances in nth reconciliation CVR might be in Rebuilding
// and pending snapshots added under CVR.Status.PendingSnapshots and after that
// let us assume this pool is down meanwhile if snapshot deletion request
// came and deleted snapshots in peer replicas. In next reconciliation if
// CVR is Healthy then there might be chances that pending snapshots remains
// as is to cover this corner case below check is required.
if cvr.Status.Phase == apis.CVRStatusOnline &&
mynktl marked this conversation as resolved.
Show resolved Hide resolved
len(cvr.Status.PendingSnapshots) != 0 {
cvr.Status.PendingSnapshots = nil
}
return nil
}

//func getAndUpdatePendingSnapshotsList(
// clientset clientset.Interface, cvr *apis.CStorVolumeReplica) error {
// peerCVRList, err := getPeerReplicas(clientset, cvr)
// if err != nil {
// return errors.Wrapf(err, "failed to get peer CVRs info of volume %s", volName)
// }
// TODO: Perform following steps
// 1. Get the snapshot info from peer replicas
// 2. If snapshot info doesn't exist under CVR.Status.Snapshots then
// add it under CVR.Status.PendingSnapshots
//}
// getAndAddPendingSnapshotList get the snapshot information from peer replicas and
// add under pending snapshot list
// NOTE: Below function will delete the snapshot under pending snapshots if doesn't exists
// on peer replicas
func getAndAddPendingSnapshotList(
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
clientset clientset.Interface, cvr *apis.CStorVolumeReplica) error {
peerCVRList, err := getPeerReplicas(clientset, cvr)
if err != nil {
return errors.Wrapf(err, "failed to get peer CVRs of volume replica %s", cvr.Name)
}

peerSnapshotList := getPeerSnapshotInfoList(peerCVRList)
if cvr.Status.PendingSnapshots == nil {
cvr.Status.PendingSnapshots = map[string]apis.CStorSnapshotInfo{}
}

// Delete the pending snapshot that doesn't exist in peer snapshot list
for snapName, _ := range cvr.Status.PendingSnapshots {
if _, ok := peerSnapshotList[snapName]; !ok {
delete(cvr.Status.PendingSnapshots, snapName)
}
}

// Add peer snapshots if doesn't exist on Snapshots and PendingSnapshots
// of current CVR
for snapName, snapInfo := range peerSnapshotList {
if _, ok := cvr.Status.Snapshots[snapName]; !ok {
if _, ok := cvr.Status.PendingSnapshots[snapName]; !ok {
cvr.Status.PendingSnapshots[snapName] = snapInfo
}
}
}

return nil
}

// getPeerReplicas returns list of peer replicas of volume
func getPeerReplicas(
clientset clientset.Interface,
cvr *apis.CStorVolumeReplica) (*apis.CStorVolumeReplicaList, error) {
volName := cvr.GetLabels()[string(apis.PersistentVolumeCPK)]
peerCVRList := &apis.CStorVolumeReplicaList{}
cvrList, err := clientset.OpenebsV1alpha1().
CStorVolumeReplicas(cvr.Namespace).
List(metav1.ListOptions{LabelSelector: string(apis.PersistentVolumeCPK) + "=" + volName})
if err != nil {
return nil, err
}
for _, obj := range cvrList.Items {
if obj.Name != cvr.Name {
peerCVRList.Items = append(peerCVRList.Items, obj)
}
}
return peerCVRList, nil
}

// getPeerSnapshotInfoList returns the map of snapshot name and snapshot info
// If any healthy replica exist in peer replica it will return Status.Snapshots
// else iterate over all the degraded replicas and get snapshot list
func getPeerSnapshotInfoList(
peerCVRList *apis.CStorVolumeReplicaList) map[string]apis.CStorSnapshotInfo {

// TODO: Get Opinion From Review Comments
// NOTE: There are possibilites to have stale phase
healthyReplica := getHealthyReplicaFromPeerReplicas(peerCVRList)
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
if healthyReplica != nil {
// Since Updating the status of replica and snapshot list is atomic
// update safe to return status.snapshots
return healthyReplica.Status.Snapshots
}

snapshotInfoList := map[string]apis.CStorSnapshotInfo{}
for _, cvrObj := range peerCVRList.Items {
// No need to get snapshot information from Offline,
// NewReplicaDegraded and Recreate CVR becasue they might
// consist stale information
if cvrObj.Status.Phase == apis.CVRStatusDegraded {
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
for snapName, snapInfo := range cvrObj.Status.Snapshots {
if _, ok := snapshotInfoList[snapName]; !ok {
snapshotInfoList[snapName] = snapInfo
}
}
}
}
return snapshotInfoList
}

// getHealthyReplicaFromPeerReplicas returns healthy replica from the
// peer replica list if exists else it return nil
func getHealthyReplicaFromPeerReplicas(
peerCVRList *apis.CStorVolumeReplicaList) *apis.CStorVolumeReplica {
for _, cvrObj := range peerCVRList.Items {
if cvrObj.Status.Phase == apis.CVRStatusOnline {
return &cvrObj
}
}
return nil
}

// addOrDeleteSnapshotListInfo adds/deletes the snapshots in CVR
// It performs below steps:
Expand Down