Skip to content

Commit

Permalink
feat(zfspv): wait for zfs volume to be created
Browse files Browse the repository at this point in the history
Signed-off-by: Pawan <pawan@mayadata.io>
  • Loading branch information
pawanpraka1 committed Jan 6, 2021
1 parent 2e5e61d commit e4fac6a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 87 deletions.
80 changes: 4 additions & 76 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,55 +101,6 @@ func getRoundedCapacity(size int64) int64 {
return ((size + Mi - 1) / Mi) * Mi
}

func waitForReadyVolume(volname string) error {
for true {
vol, err := zfs.GetZFSVolume(volname)
if err != nil {
return status.Errorf(codes.Internal,
"zfs: wait failed, not able to get the volume %s %s", volname, err.Error())
}

switch vol.Status.State {
case zfs.ZFSStatusReady:
return nil
}
time.Sleep(time.Second)
}
return nil
}

func waitForVolDestroy(volname string) error {
for true {
_, err := zfs.GetZFSVolume(volname)
if err != nil {
if k8serror.IsNotFound(err) {
return nil
}
return status.Errorf(codes.Internal,
"zfs: destroy wait failed, not able to get the volume %s %s", volname, err.Error())
}
time.Sleep(time.Second)
}
return nil
}

func waitForReadySnapshot(snapname string) error {
for true {
snap, err := zfs.GetZFSSnapshot(snapname)
if err != nil {
return status.Errorf(codes.Internal,
"zfs: wait failed, not able to get the snapshot %s %s", snapname, err.Error())
}

switch snap.Status.State {
case zfs.ZFSStatusReady:
return nil
}
time.Sleep(time.Second)
}
return nil
}

// CreateZFSVolume create new zfs volume from csi volume request
func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
volName := strings.ToLower(req.GetName())
Expand Down Expand Up @@ -180,19 +131,11 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
capacity := strconv.FormatInt(int64(size), 10)

if vol, err := zfs.GetZFSVolume(volName); err == nil {
if vol.DeletionTimestamp != nil {
if _, ok := parameters["wait"]; ok {
if err := waitForVolDestroy(volName); err != nil {
return "", err
}
}
} else {
if vol.Spec.Capacity != capacity {
return "", status.Errorf(codes.AlreadyExists,
"volume %s already present", volName)
}
return vol.Spec.OwnerNodeID, nil
if vol.Spec.Capacity != capacity {
return "", status.Errorf(codes.AlreadyExists,
"volume %s already present", volName)
}
return vol.Spec.OwnerNodeID, nil
}

nmap, err := getNodeMap(schld, pool)
Expand Down Expand Up @@ -381,12 +324,6 @@ func (cs *controller) CreateVolume(
return nil, err
}

if _, ok := parameters["wait"]; ok {
if err := waitForReadyVolume(volName); err != nil {
return nil, err
}
}

sendEventOrIgnore(pvcName, volName, strconv.FormatInt(int64(size), 10), "zfs-localpv", analytics.VolumeProvision)

topology := map[string]string{zfs.ZFSTopologyKey: selected}
Expand Down Expand Up @@ -678,15 +615,6 @@ func (cs *controller) CreateSnapshot(
)
}

originalParams := req.GetParameters()
parameters := helpers.GetCaseInsensitiveMap(&originalParams)

if _, ok := parameters["wait"]; ok {
if err := waitForReadySnapshot(snapName); err != nil {
return nil, err
}
}

state, _ = zfs.GetZFSSnapshotStatus(snapName)

return csipayload.NewCreateSnapshotResponseBuilder().
Expand Down
8 changes: 5 additions & 3 deletions pkg/mgmt/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ func (c *SnapController) syncSnap(snap *apis.ZFSSnapshot) error {
zfs.RemoveSnapFinalizer(snap)
}
} else {
// if finalizer is not set then it means we are creating
// status pendnig means we are creating
// the zfs snapshot.
if snap.Finalizers == nil {
if snap.Status.State == zfs.ZFSStatusPending {
err = zfs.CreateSnapshot(snap)
if err == nil {
err = zfs.UpdateSnapInfo(snap)
err = zfs.UpdateSnapInfo(snap, zfs.ZFSStatusReady)
} else {
err = zfs.UpdateSnapInfo(snap, zfs.ZFSStatusFailed)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/mgmt/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func (c *ZVController) syncZV(zv *apis.ZFSVolume) error {
err = zfs.CreateVolume(zv)
}
if err == nil {
err = zfs.UpdateZvolInfo(zv)
err = zfs.UpdateZvolInfo(zv, zfs.ZFSStatusReady)
} else {
err = zfs.UpdateZvolInfo(zv, zfs.ZFSStatusFailed)
}
}
}
Expand Down
110 changes: 103 additions & 7 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package zfs

import (
"fmt"
"os"
"strconv"
"time"

apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
"github.com/openebs/zfs-localpv/pkg/builder/bkpbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/restorebuilder"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
Expand Down Expand Up @@ -80,6 +83,79 @@ func init() {
GoogleAnalyticsEnabled = os.Getenv(GoogleAnalyticsKey)
}

func checkVolCreation(volname string) error {
for true {
vol, err := GetZFSVolume(volname)
if err != nil {
return fmt.Errorf(" wait failed, not able to get the volume %s %s", volname, err.Error())
}

switch vol.Status.State {
case ZFSStatusReady:
return nil
case ZFSStatusFailed:
// delete the volume
if err := DeleteVolume(volname); err != nil {
klog.Errorf(" not able to clean up the volume %s", volname)
}
return fmt.Errorf("not able to get create the volume")
}
time.Sleep(time.Second)
}
return fmt.Errorf("timeout, volume is still being created")
}

func waitForVolDestroy(volname string) error {
for true {
_, err := GetZFSVolume(volname)
if err != nil {
if k8serror.IsNotFound(err) {
return nil
}
return fmt.Errorf("destroy wait failed, not able to get the volume %s %s", volname, err.Error())
}
time.Sleep(time.Second)
}
return fmt.Errorf("timeout, volume is being destroyed")
}

func waitForSnapDestroy(snapname string) error {
for true {
_, err := GetZFSSnapshot(snapname)
if err != nil {
if k8serror.IsNotFound(err) {
return nil
}
return fmt.Errorf("destroy wait failed, not able to get the snapshot %s %s", snapname, err.Error())
}
time.Sleep(time.Second)
}
return fmt.Errorf("timeout, snapshot is being destroyed")
}

func waitForReadySnapshot(snapname string) error {
for true {
snap, err := GetZFSSnapshot(snapname)
if err != nil {
return fmt.Errorf("wait failed, not able to get the snapshot %s %s", snapname, err.Error())
}

switch snap.Status.State {
case ZFSStatusReady:
return nil
case ZFSStatusFailed:
// delete the snapshot
if err := DeleteSnapshot(snapname); err != nil {
klog.Errorf(" not able to clean up the snapshot %s", snapname)
}

return fmt.Errorf("not able to create the snapshot")
}
time.Sleep(time.Second)
}
return fmt.Errorf("timeout, snapshot is being created")
}

// ProvisionVolume creates a ZFSVolume(zv) CR,
// watcher for zvc is present in CSI agent
func ProvisionVolume(
Expand All @@ -91,6 +167,8 @@ func ProvisionVolume(
klog.Infof("provisioned volume %s", vol.Name)
}

err = checkVolCreation(vol.Name)

return err
}

Expand All @@ -114,6 +192,8 @@ func ProvisionSnapshot(
klog.Infof("provisioned snapshot %s", snap.Name)
}

err = waitForReadySnapshot(snap.Name)

return err
}

Expand All @@ -124,6 +204,8 @@ func DeleteSnapshot(snapname string) (err error) {
klog.Infof("deprovisioned snapshot %s", snapname)
}

err = waitForSnapDestroy(snapname)

return
}

Expand All @@ -141,6 +223,8 @@ func DeleteVolume(volumeID string) (err error) {
klog.Infof("deprovisioned volume %s", volumeID)
}

err = waitForVolDestroy(volumeID)

return
}

Expand All @@ -160,6 +244,7 @@ func GetZFSVolume(volumeID string) (*apis.ZFSVolume, error) {
getOptions := metav1.GetOptions{}
vol, err := volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).Get(volumeID, getOptions)

return vol, err
}

Expand All @@ -179,17 +264,22 @@ func GetZFSVolumeState(volID string) (string, string, error) {
}

// UpdateZvolInfo updates ZFSVolume CR with node id and finalizer
func UpdateZvolInfo(vol *apis.ZFSVolume) error {
finalizers := []string{ZFSFinalizer}
func UpdateZvolInfo(vol *apis.ZFSVolume, status string) error {
finalizers := []string{}
labels := map[string]string{ZFSNodeKey: NodeID}

if vol.Finalizers != nil {
return nil
}

switch status {
case ZFSStatusReady:
finalizers = append(finalizers, ZFSFinalizer)
}

newVol, err := volbuilder.BuildFrom(vol).
WithFinalizer(finalizers).
WithVolumeStatus(ZFSStatusReady).
WithVolumeStatus(status).
WithLabels(labels).Build()

if err != nil {
Expand All @@ -213,6 +303,7 @@ func GetZFSSnapshot(snapID string) (*apis.ZFSSnapshot, error) {
getOptions := metav1.GetOptions{}
snap, err := snapbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).Get(snapID, getOptions)

return snap, err
}

Expand All @@ -231,20 +322,25 @@ func GetZFSSnapshotStatus(snapID string) (string, error) {
}

// UpdateSnapInfo updates ZFSSnapshot CR with node id and finalizer
func UpdateSnapInfo(snap *apis.ZFSSnapshot) error {
finalizers := []string{ZFSFinalizer}
func UpdateSnapInfo(snap *apis.ZFSSnapshot, status string) error {
finalizers := []string{}
labels := map[string]string{ZFSNodeKey: NodeID}

if snap.Finalizers != nil {
return nil
}

switch status {
case ZFSStatusReady:
finalizers = append(finalizers, ZFSFinalizer)
}

newSnap, err := snapbuilder.BuildFrom(snap).
WithFinalizer(finalizers).
WithLabels(labels).Build()

// set the status to ready
newSnap.Status.State = ZFSStatusReady
// set the status
newSnap.Status.State = status

if err != nil {
klog.Errorf("Update snapshot failed %s err: %s", snap.Name, err.Error())
Expand Down

0 comments on commit e4fac6a

Please sign in to comment.