Skip to content

Commit

Permalink
adding timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Pawan <pawan@mayadata.io>
  • Loading branch information
pawanpraka1 committed Apr 1, 2021
1 parent bf878c1 commit a7a1031
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 30 deletions.
25 changes: 16 additions & 9 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func waitForReadySnapshot(snapname string) error {
}

// CreateZFSVolume create new zfs volume from csi volume request
func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
func CreateZFSVolume(ctx context.Context, req *csi.CreateVolumeRequest) (string, error) {
volName := strings.ToLower(req.GetName())
size := getRoundedCapacity(req.GetCapacityRange().RequiredBytes)

Expand Down Expand Up @@ -229,10 +229,17 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
for _, node := range prfList {
vol, _ := volbuilder.BuildFrom(volObj).WithOwnerNode(node).WithVolumeStatus(zfs.ZFSStatusPending).Build()

err = zfs.ProvisionVolume(vol)
done := false

done, err = zfs.ProvisionVolume(ctx, vol)
if err == nil {
return node, nil
}

// if timeout reached, return the error and let csi retry the volume creation
if done {
break
}
}

if err != nil {
Expand All @@ -245,7 +252,7 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
}

// CreateVolClone creates the clone from a volume
func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error) {
func CreateVolClone(ctx context.Context, req *csi.CreateVolumeRequest, srcVol string) (string, error) {
volName := strings.ToLower(req.GetName())
parameters := req.GetParameters()
// lower case keys, cf CreateZFSVolume()
Expand Down Expand Up @@ -283,7 +290,7 @@ func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error)
// use the snapshot name same as new volname
volObj.Spec.SnapName = vol.Name + "@" + volName

err = zfs.ProvisionVolume(volObj)
_, err = zfs.ProvisionVolume(ctx, volObj)
if err != nil {
return "", status.Errorf(codes.Internal,
"clone: not able to provision the volume err : %s", err.Error())
Expand All @@ -293,7 +300,7 @@ func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error)
}

// CreateSnapClone creates the clone from a snapshot
func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {
func CreateSnapClone(ctx context.Context, req *csi.CreateVolumeRequest, snapshot string) (string, error) {
volName := strings.ToLower(req.GetName())
parameters := req.GetParameters()
// lower case keys, cf CreateZFSVolume()
Expand Down Expand Up @@ -336,7 +343,7 @@ func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, err
volObj.Spec = snap.Spec
volObj.Spec.SnapName = strings.ToLower(snapshot)

err = zfs.ProvisionVolume(volObj)
_, err = zfs.ProvisionVolume(ctx, volObj)
if err != nil {
return "", status.Errorf(codes.Internal,
"not able to provision the clone volume err : %s", err.Error())
Expand Down Expand Up @@ -369,12 +376,12 @@ func (cs *controller) CreateVolume(
if contentSource != nil && contentSource.GetSnapshot() != nil {
snapshotID := contentSource.GetSnapshot().GetSnapshotId()

selected, err = CreateSnapClone(req, snapshotID)
selected, err = CreateSnapClone(ctx, req, snapshotID)
} else if contentSource != nil && contentSource.GetVolume() != nil {
srcVol := contentSource.GetVolume().GetVolumeId()
selected, err = CreateVolClone(req, srcVol)
selected, err = CreateVolClone(ctx, req, srcVol)
} else {
selected, err = CreateZFSVolume(req)
selected, err = CreateZFSVolume(ctx, req)
}

if err != nil {
Expand Down
52 changes: 31 additions & 21 deletions pkg/zfs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package zfs

import (
"context"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -82,33 +83,42 @@ func init() {
GoogleAnalyticsEnabled = os.Getenv(GoogleAnalyticsKey)
}

func checkVolCreation(volname string) error {
for true {
vol, err := GetZFSVolume(volname)
if err != nil {
return fmt.Errorf("zfs: wait failed, not able to get the volume %s %s", volname, err.Error())
func checkVolCreation(ctx context.Context, volname string) (bool, error) {
timeout := time.After(10 * time.Second)
for {
select {
case <-ctx.Done():
return true, fmt.Errorf("zfs: context deadline reached")
case <-timeout:
return true, fmt.Errorf("zfs: vol creation timeout reached")
default:
vol, err := GetZFSVolume(volname)
if err != nil {
return false, fmt.Errorf("zfs: wait failed, not able to get the volume %s %s", volname, err.Error())
}

switch vol.Status.State {
case ZFSStatusReady:
return true, nil
case ZFSStatusFailed:
return false, fmt.Errorf("zfs: volume creation failed")
}

klog.Infof("zfs: waiting for volume %s/%s to be created on node %s",
vol.Spec.PoolName, volname, vol.Spec.OwnerNodeID)

time.Sleep(time.Second)
}

switch vol.Status.State {
case ZFSStatusReady:
return nil
case ZFSStatusFailed:
return fmt.Errorf("zfs: volume creation failed")
}

klog.Infof("zfs: waiting for volume %s/%s to be created on node %s",
vol.Spec.PoolName, volname, vol.Spec.OwnerNodeID)

time.Sleep(time.Second)
}
return nil
}

// ProvisionVolume creates a ZFSVolume(zv) CR,
// watcher for zvc is present in CSI agent
func ProvisionVolume(
ctx context.Context,
vol *apis.ZFSVolume,
) error {
) (bool, error) {
done := false
zv, err := GetZFSVolume(vol.Name)

if err == nil {
Expand All @@ -121,15 +131,15 @@ func ProvisionVolume(
}

if err == nil {
err = checkVolCreation(vol.Name)
done, err = checkVolCreation(ctx, vol.Name)
}

if err != nil {
klog.Infof("zfs: volume %s/%s provisioning failed on node %s err: %s",
vol.Spec.PoolName, vol.Name, vol.Spec.OwnerNodeID, err.Error())
}

return err
return done, err
}

// ResizeVolume resizes the zfs volume
Expand Down

0 comments on commit a7a1031

Please sign in to comment.