diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index 61601209565..6b4e2a630a1 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -115,6 +115,15 @@ func parseVolCreateRequest(req *csi.CreateVolumeRequest) (*rbdVolume, error) { return rbdVol, nil } +func storeVolumeMetadata(vol *rbdVolume, cp util.CachePersister) error { + if err := cp.Create(vol.VolID, vol); err != nil { + klog.Errorf("failed to store metadata for volume %s: %v", vol.VolID, err) + return err + } + + return nil +} + // CreateVolume creates the volume in backend and store the volume metadata func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { @@ -136,6 +145,11 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol // request if exVol.VolSize >= req.GetCapacityRange().GetRequiredBytes() { // existing volume is compatible with new request and should be reused. + + if err = storeVolumeMetadata(exVol, cs.MetadataStore); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // TODO (sbezverk) Do I need to make sure that RBD volume still exists? return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -160,16 +174,13 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol if err != nil { return nil, err } - if createErr := cs.MetadataStore.Create(rbdVol.VolID, rbdVol); createErr != nil { - klog.Warningf("failed to store volume metadata with error: %v", err) - if err = deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil { - klog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err) - return nil, err - } - return nil, createErr - } rbdVolumes[rbdVol.VolID] = rbdVol + + if err = storeVolumeMetadata(rbdVol, cs.MetadataStore); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ VolumeId: rbdVol.VolID, @@ -294,6 +305,7 @@ func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *cs // CreateSnapshot creates the snapshot in backend and stores metadata // in store +// nolint: gocyclo func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { if err := cs.validateSnapshotReq(req); err != nil { @@ -311,6 +323,10 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS // check for the requested source volume id and already allocated source volume id if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil { if req.SourceVolumeId == exSnap.SourceVolumeID { + if err = storeSnapshotMetadata(exSnap, cs.MetadataStore); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ SizeBytes: exSnap.SizeBytes, @@ -357,11 +373,12 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS rbdSnap.CreatedAt = ptypes.TimestampNow().GetSeconds() - if err = cs.storeSnapMetadata(rbdSnap, req.GetSecrets()); err != nil { - return nil, err + rbdSnapshots[snapshotID] = rbdSnap + + if err = storeSnapshotMetadata(rbdSnap, cs.MetadataStore); err != nil { + return nil, status.Error(codes.Internal, err.Error()) } - rbdSnapshots[snapshotID] = rbdSnap return &csi.CreateSnapshotResponse{ Snapshot: &csi.Snapshot{ SizeBytes: rbdSnap.SizeBytes, @@ -375,22 +392,13 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS }, nil } -func (cs *ControllerServer) storeSnapMetadata(rbdSnap *rbdSnapshot, secret map[string]string) error { - errCreate := cs.MetadataStore.Create(rbdSnap.SnapID, rbdSnap) - if errCreate != nil { - klog.Warningf("rbd: failed to store snapInfo with error: %v", errCreate) - // Unprotect snapshot - err := unprotectSnapshot(rbdSnap, rbdSnap.AdminID, secret) - if err != nil { - return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err) - } - // Deleting snapshot - klog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName) - if err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, secret); err != nil { - return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err) - } +func storeSnapshotMetadata(rbdSnap *rbdSnapshot, cp util.CachePersister) error { + if err := cp.Create(rbdSnap.SnapID, rbdSnap); err != nil { + klog.Errorf("failed to store metadata for snapshot %s: %v", rbdSnap.SnapID, err) + return err } - return errCreate + + return nil } func (cs *ControllerServer) validateSnapshotReq(req *csi.CreateSnapshotRequest) error {