Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

avoid double creation of same volume, serialize with mutex #106

Merged
merged 3 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 30 additions & 8 deletions pkg/pmem-csi-driver/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/intel/pmem-csi/pkg/pmem-common"
pmdmanager "github.com/intel/pmem-csi/pkg/pmem-device-manager"
"github.com/intel/pmem-csi/pkg/pmem-grpc"
"k8s.io/kubernetes/pkg/util/keymutex" // TODO: move to k8s.io/utils (https://github.com/kubernetes/utils/issues/62)
)

//VolumeStatus type representation for volume status
Expand Down Expand Up @@ -57,6 +58,7 @@ type controllerServer struct {
}

var _ csi.ControllerServer = &controllerServer{}
var volumeMutex = keymutex.NewHashed(-1)

func NewControllerServer(driver *CSIDriver, mode DriverMode, rs *registryServer, dm pmdmanager.PmemDeviceManager) csi.ControllerServer {
serverCaps := []csi.ControllerServiceCapability_RPC_Type{}
Expand Down Expand Up @@ -211,6 +213,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
pmemcommon.Infof(3, ctx, "invalid delete volume req: %v", req)
return nil, err
}

// Serialize by VolumeId
volumeMutex.LockKey(req.VolumeId)
defer volumeMutex.UnlockKey(req.VolumeId)

pmemcommon.Infof(4, ctx, "DeleteVolume: volumeID: %v", req.GetVolumeId())
vol := cs.GetVolumeByID(req.GetVolumeId())
if vol == nil {
Expand Down Expand Up @@ -299,12 +306,16 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Node ID must be provided")
}

glog.Infof("ControllerPublishVolume: cs.Node: %s req.volume_id: %s, req.node_id: %s ", cs.Driver.nodeID, req.VolumeId, req.NodeId)

if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume capability must be provided")
}

// Serialize by VolumeId
volumeMutex.LockKey(req.VolumeId)
defer volumeMutex.UnlockKey(req.VolumeId)

glog.Infof("ControllerPublishVolume: cs.Node: %s req.volume_id: %s, req.node_id: %s ", cs.Driver.nodeID, req.VolumeId, req.NodeId)

if cs.mode == Controller {
vol, ok := cs.pmemVolumes[req.VolumeId]
if !ok {
Expand Down Expand Up @@ -366,13 +377,20 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs
volumeSize = vol.Size
nsmode = vol.NsMode
}
glog.Infof("ControllerPublishVolume: volumeName:%v volumeSize:%v nsmode:%v", volumeName, volumeSize, nsmode)
/* Node/Unified */
if err := cs.dm.CreateDevice(req.VolumeId, volumeSize, nsmode); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create volume: %s", err.Error())
}
// Check have we already published this volume.
// We may get called with same VolumeId repeatedly in short time
vol, published := cs.publishVolumeInfo[req.VolumeId]
if published {
glog.Infof("ControllerPublishVolume: Name:%v Id:%v already published, skip creation", vol, req.VolumeId)
} else {
glog.Infof("ControllerPublishVolume: volumeName:%v volumeSize:%v nsmode:%v", volumeName, volumeSize, nsmode)
/* Node/Unified */
if err := cs.dm.CreateDevice(req.VolumeId, volumeSize, nsmode); err != nil {
return nil, status.Errorf(codes.Internal, "Failed to create volume: %s", err.Error())
}

cs.publishVolumeInfo[req.VolumeId] = volumeName
cs.publishVolumeInfo[req.VolumeId] = volumeName
}

return &csi.ControllerPublishVolumeResponse{
PublishInfo: map[string]string{
Expand All @@ -387,6 +405,10 @@ func (cs *controllerServer) ControllerUnpublishVolume(ctx context.Context, req *
return nil, status.Error(codes.InvalidArgument, "ControllerPublishVolume Volume ID must be provided")
}

// Serialize by VolumeId
volumeMutex.LockKey(req.VolumeId)
defer volumeMutex.UnlockKey(req.VolumeId)

glog.Infof("ControllerUnpublishVolume : volume_id: %s, node_id: %s ", req.VolumeId, req.NodeId)

if cs.mode == Controller {
Expand Down
18 changes: 18 additions & 0 deletions pkg/pmem-csi-driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "Staging target path missing in request")
}

// Serialize by VolumeId
volumeMutex.LockKey(req.GetVolumeId())
defer volumeMutex.UnlockKey(req.GetVolumeId())

targetPath := req.TargetPath
stagingtargetPath := req.StagingTargetPath
// TODO: check is bind-mount already made
Expand Down Expand Up @@ -114,6 +118,10 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}
volumeID := req.GetVolumeId()

// Serialize by VolumeId
volumeMutex.LockKey(volumeID)
defer volumeMutex.UnlockKey(volumeID)

// Unmounting the image
glog.Infof("NodeUnpublishVolume: unmount %s", targetPath)
err := mount.New("").Unmount(targetPath)
Expand Down Expand Up @@ -144,6 +152,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
attrs := req.GetVolumeAttributes()

requestedFsType := req.GetVolumeCapability().GetMount().GetFsType()

// Serialize by VolumeId
volumeMutex.LockKey(req.GetVolumeId())
defer volumeMutex.UnlockKey(req.GetVolumeId())

// showing for debug:
glog.Infof("NodeStageVolume: VolumeID is %v", req.GetVolumeId())
glog.Infof("NodeStageVolume: VolumeName is %v", attrs["name"])
Expand Down Expand Up @@ -243,6 +256,11 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}

volName := ns.volInfo[req.VolumeId]

// Serialize by VolumeId
volumeMutex.LockKey(req.GetVolumeId())
defer volumeMutex.UnlockKey(req.GetVolumeId())

// showing for debug:
glog.Infof("NodeUnStageVolume: VolumeID is %v", req.GetVolumeId())
glog.Infof("NodeUnStageVolume: VolumeName is %v", volName)
Expand Down
Loading