Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: don't delete volume/snapshot if metadata creation fails #201

Merged
merged 2 commits into from
Feb 14, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions pkg/rbd/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ func parseVolCreateRequest(req *csi.CreateVolumeRequest) (*rbdVolume, error) {
return rbdVol, nil
}

func storeVolumeMetadata(vol *rbdVolume, cp util.CachePersister) error {
if err := cp.Create(vol.VolID, vol); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any special handling if the err is already exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already taken care of :)

_, err = k8scm.Client.CoreV1().ConfigMaps(k8scm.Namespace).Create(cm)
if err != nil {
if apierrs.IsAlreadyExists(err) {
klog.V(4).Infof("k8s-cm-cache: configmap already exists")
return nil
}
return errors.Wrapf(err, "k8s-cm-cache: couldn't persist %s metadata as configmap", identifier)
}

klog.Errorf("failed to store metadata for volume %s: %v", vol.VolID, err)
return err
}

return nil
}

// CreateVolume creates the volume in backend and store the volume metadata
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {

Expand All @@ -136,6 +145,11 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// request
if exVol.VolSize >= req.GetCapacityRange().GetRequiredBytes() {
// existing volume is compatible with new request and should be reused.

if err = storeVolumeMetadata(exVol, cs.MetadataStore); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gman0 I have a question here, as this exVol is retrieved from the metadataStore why we need to store it back?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well yes and no... rbdVolumes (and snapshots) is initialized from the metadata cache - but after that, it's populated by the CreateVolume function itself https://github.com/gman0/ceph-csi/blob/d9f71d3887f4eaed3cfa34de8bbd1686163d0a32/pkg/rbd/controllerserver.go#L178

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But my question is why we have store it back. I think we can update the local cache once we store the Metadata on configmap is successful.
By doing this we can avoid extra step of storing if volume is already present, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, you're right! For whatever reason I've completely missed this bit https://github.com/gman0/ceph-csi/blob/d9f71d3887f4eaed3cfa34de8bbd1686163d0a32/pkg/rbd/controllerserver.go#L172-L176

which checks for the existence of the image too...nice catch!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gman0 even with the current design, suppose CSI pod restarts after creating volume in the backend and before storing metadata on config map, we may end up in having stale volumes in the backend. do we have a CLI command in cephfs to list the volumes (by checking at the backend we can assure that there won't be any stale volumes even if CSI restarts) (note: PVC create/delete may become a slow process)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading more through the code, the only issue we are having with restart (as explained above) is volumeID and snapshotID because we are generating new UUID for each request for the same volume we may generate two volume ID if restart happens.

#205 fixes the issue as of now. but as suggested by @ShyamsundarR in #135 (comment) will fix the issue in long run.

return nil, status.Error(codes.Internal, err.Error())
}

// TODO (sbezverk) Do I need to make sure that RBD volume still exists?
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand All @@ -160,16 +174,13 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil {
return nil, err
}
if createErr := cs.MetadataStore.Create(rbdVol.VolID, rbdVol); createErr != nil {
klog.Warningf("failed to store volume metadata with error: %v", err)
if err = deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil {
klog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err)
return nil, err
}
return nil, createErr
}

rbdVolumes[rbdVol.VolID] = rbdVol

if err = storeVolumeMetadata(rbdVol, cs.MetadataStore); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: rbdVol.VolID,
Expand Down Expand Up @@ -294,6 +305,7 @@ func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *cs

// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
// nolint: gocyclo
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {

if err := cs.validateSnapshotReq(req); err != nil {
Expand All @@ -311,6 +323,10 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
// check for the requested source volume id and already allocated source volume id
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
if req.SourceVolumeId == exSnap.SourceVolumeID {
if err = storeSnapshotMetadata(exSnap, cs.MetadataStore); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here as well for snapshot

return nil, status.Error(codes.Internal, err.Error())
}

return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: exSnap.SizeBytes,
Expand Down Expand Up @@ -357,11 +373,12 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS

rbdSnap.CreatedAt = ptypes.TimestampNow().GetSeconds()

if err = cs.storeSnapMetadata(rbdSnap, req.GetSecrets()); err != nil {
return nil, err
rbdSnapshots[snapshotID] = rbdSnap

if err = storeSnapshotMetadata(rbdSnap, cs.MetadataStore); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

rbdSnapshots[snapshotID] = rbdSnap
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: rbdSnap.SizeBytes,
Expand All @@ -375,22 +392,13 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
}, nil
}

func (cs *ControllerServer) storeSnapMetadata(rbdSnap *rbdSnapshot, secret map[string]string) error {
errCreate := cs.MetadataStore.Create(rbdSnap.SnapID, rbdSnap)
if errCreate != nil {
klog.Warningf("rbd: failed to store snapInfo with error: %v", errCreate)
// Unprotect snapshot
err := unprotectSnapshot(rbdSnap, rbdSnap.AdminID, secret)
if err != nil {
return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
}
// Deleting snapshot
klog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName)
if err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, secret); err != nil {
return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
}
func storeSnapshotMetadata(rbdSnap *rbdSnapshot, cp util.CachePersister) error {
if err := cp.Create(rbdSnap.SnapID, rbdSnap); err != nil {
klog.Errorf("failed to store metadata for snapshot %s: %v", rbdSnap.SnapID, err)
return err
}
return errCreate

return nil
}

func (cs *ControllerServer) validateSnapshotReq(req *csi.CreateSnapshotRequest) error {
Expand Down