From 47b115667c68a386816cafa563ad61ea24c9fffe Mon Sep 17 00:00:00 2001 From: ShyamsundarR Date: Tue, 28 May 2019 15:03:18 -0400 Subject: [PATCH] Make CephFS plugin stateless reusing RADOS based journal scheme This is a part of the stateless set of commits for CephCSI. This commit removes the dependency on config maps to store cephFS provisioned volumes, and instead relies on RADOS based objects and keys, and required CSI VolumeID encoding to detect the provisioned volumes. Changes: - Provide backward compatibility to provisioned volumes by older plugin versions (1.0.0 or older) - Remove Create/Delete support for statically provisioned volumes (fixes #382) - Added namespace support to RADOS OMaps and used the same to store RADOS CSI objects and keys in the CephFS metadata pool - Added support to mention fsname for CephFS provisioning (fixes #359) - Changed field name in CSI Identifier to 'location', to denote a pool or fscid - Updated mounter cache to use new scheme - Required Helm manifests are updated - Required documentation and other manifests are updated - Made driver option 'metadatastorage' as optional, as fresh installs do not need to specify the same Testing done: - Create/Mount/Delete PVC - Create/Delete 5 PVCs - Mount version 1.0.0 PVC - Delete version 1.0.0 PV - Mount Statically defined PV/PVC/Pod - Mount Statically defined version 1.0.0 PV/PVC/Pod - Delete Statically defined version 1.0.0 PV/PVC/Pod - Node restart when mounted to test mountcache - Use InstanceID other than 'default' - RBD basic round of tests, as namespace is added to OMaps - csitest against ceph-fs plugin - NOTE: CephFS plugin still does not detect and address already created volumes but of a different size - Test not providing any value to the metadata storage parameter Signed-off-by: ShyamsundarR --- cmd/cephcsi.go | 18 +- .../helm/templates/csiplugin-configmap.yaml | 14 + .../helm/templates/nodeplugin-daemonset.yaml | 11 +- .../templates/provisioner-statefulset.yaml | 5 + deploy/cephfs/helm/values.yaml | 1 + .../csi-cephfsplugin-provisioner.yaml | 5 + .../cephfs/kubernetes/csi-cephfsplugin.yaml | 5 + deploy/cephfs/kubernetes/csi-config-map.yaml | 8 + docs/deploy-cephfs.md | 44 ++- docs/deploy-rbd.md | 5 +- examples/README.md | 13 +- examples/cephfs/plugin-deploy.sh | 2 +- examples/cephfs/plugin-teardown.sh | 2 +- examples/cephfs/secret.yaml | 4 +- examples/cephfs/storageclass.yaml | 26 +- examples/{rbd => }/csi-config-map-sample.yaml | 0 examples/rbd/plugin-deploy.sh | 2 +- examples/rbd/plugin-teardown.sh | 2 +- pkg/cephfs/cephfs_util.go | 117 +++++++ pkg/cephfs/cephuser.go | 1 + pkg/cephfs/controllerserver.go | 174 +++++++--- pkg/cephfs/driver.go | 36 ++- pkg/cephfs/errors.go | 37 +++ pkg/cephfs/fsjournal.go | 136 ++++++++ pkg/cephfs/mountcache.go | 32 +- pkg/cephfs/nodeserver.go | 37 ++- pkg/cephfs/util.go | 5 +- pkg/cephfs/volumemounter.go | 18 +- pkg/cephfs/volumeoptions.go | 297 ++++++++++++++---- pkg/rbd/rbd_util.go | 4 +- pkg/util/cephcmds.go | 84 +++-- pkg/util/util.go | 2 +- pkg/util/volid.go | 10 +- pkg/util/volid_test.go | 2 +- pkg/util/voljournal.go | 34 +- 35 files changed, 945 insertions(+), 248 deletions(-) create mode 100644 deploy/cephfs/helm/templates/csiplugin-configmap.yaml create mode 100644 deploy/cephfs/kubernetes/csi-config-map.yaml rename examples/{rbd => }/csi-config-map-sample.yaml (100%) create mode 100644 pkg/cephfs/cephfs_util.go create mode 100644 pkg/cephfs/errors.go create mode 100644 pkg/cephfs/fsjournal.go diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index f34a02dfee3b..e111bd2150dd 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -42,11 +42,11 @@ var ( endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint") driverName = flag.String("drivername", "", "name of the driver") nodeID = flag.String("nodeid", "", "node id") + instanceID = flag.String("instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ + " instances, when sharing Ceph clusters across CSI instances for provisioning") // rbd related flags containerized = flag.Bool("containerized", true, "whether run as containerized") - instanceID = flag.String("instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ - " instances, when sharing Ceph clusters across CSI instances for provisioning") // cephfs related flags volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')") @@ -93,6 +93,8 @@ func getDriverName() string { } func main() { + var cp util.CachePersister + driverType := getType() if len(driverType) == 0 { klog.Fatalln("driver type not specified") @@ -112,13 +114,15 @@ func main() { case cephfsType: cephfs.PluginFolder = cephfs.PluginFolder + dname - cp, err := util.CreatePersistanceStorage( - cephfs.PluginFolder, *metadataStorage, dname) - if err != nil { - os.Exit(1) + if *metadataStorage != "" { + cp, err = util.CreatePersistanceStorage( + cephfs.PluginFolder, *metadataStorage, dname) + if err != nil { + os.Exit(1) + } } driver := cephfs.NewDriver() - driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, cp) + driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, *instanceID, cp) default: klog.Fatalln("invalid volume type", vtype) // calls exit diff --git a/deploy/cephfs/helm/templates/csiplugin-configmap.yaml b/deploy/cephfs/helm/templates/csiplugin-configmap.yaml new file mode 100644 index 000000000000..66b4a55452e7 --- /dev/null +++ b/deploy/cephfs/helm/templates/csiplugin-configmap.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.configMapName | quote }} + labels: + app: {{ include "ceph-csi-cephfs.name" . }} + chart: {{ include "ceph-csi-cephfs.chart" . }} + component: {{ .Values.provisioner.name }} + release: {{ .Release.Name }} + heritage: {{ .Release.Service }} +data: + config.json: |- + [] diff --git a/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml b/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml index 24ffc8420858..d81be22caa4e 100644 --- a/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml +++ b/deploy/cephfs/helm/templates/nodeplugin-daemonset.yaml @@ -25,7 +25,7 @@ spec: spec: serviceAccountName: {{ include "ceph-csi-cephfs.serviceAccountName.nodeplugin" . }} hostNetwork: true - hostPID: true + hostPID: true # to use e.g. Rook orchestrated cluster, and mons' FQDN is # resolved through k8s service, set dns policy to cluster first dnsPolicy: ClusterFirstWithHostNet @@ -97,12 +97,14 @@ spec: - mountPath: /dev name: host-dev - mountPath: /rootfs - name: host-rootfs + name: host-rootfs - mountPath: /sys name: host-sys - mountPath: /lib/modules name: lib-modules readOnly: true + - name: ceph-csi-config + mountPath: /etc/ceph-csi-config/ resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: @@ -129,13 +131,16 @@ spec: path: /dev - name: host-rootfs hostPath: - path: / + path: / - name: host-sys hostPath: path: /sys - name: lib-modules hostPath: path: /lib/modules + - name: ceph-csi-config + configMap: + name: {{ .Values.configMapName | quote }} {{- if .Values.nodeplugin.affinity -}} affinity: {{ toYaml .Values.nodeplugin.affinity . | indent 8 }} diff --git a/deploy/cephfs/helm/templates/provisioner-statefulset.yaml b/deploy/cephfs/helm/templates/provisioner-statefulset.yaml index 539647ab130e..c5853195141f 100644 --- a/deploy/cephfs/helm/templates/provisioner-statefulset.yaml +++ b/deploy/cephfs/helm/templates/provisioner-statefulset.yaml @@ -85,6 +85,8 @@ spec: mountPath: {{ .Values.socketDir }} - name: host-rootfs mountPath: "/rootfs" + - name: ceph-csi-config + mountPath: /etc/ceph-csi-config/ resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} volumes: @@ -94,6 +96,9 @@ spec: - name: host-rootfs hostPath: path: / + - name: ceph-csi-config + configMap: + name: {{ .Values.configMapName | quote }} {{- if .Values.provisioner.affinity -}} affinity: {{ toYaml .Values.provisioner.affinity . | indent 8 }} diff --git a/deploy/cephfs/helm/values.yaml b/deploy/cephfs/helm/values.yaml index cfc64fe5440e..218074fc68a6 100644 --- a/deploy/cephfs/helm/values.yaml +++ b/deploy/cephfs/helm/values.yaml @@ -18,6 +18,7 @@ socketFile: csi.sock registrationDir: /var/lib/kubelet/plugins_registry volumeDevicesDir: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices driverName: cephfs.csi.ceph.com +configMapName: ceph-csi-config attacher: name: attacher enabled: true diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index 473b493b9034..18594e2f2e73 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -85,6 +85,8 @@ spec: readOnly: true - name: host-dev mountPath: /dev + - name: ceph-csi-config + mountPath: /etc/ceph-csi-config/ volumes: - name: socket-dir hostPath: @@ -99,3 +101,6 @@ spec: - name: host-dev hostPath: path: /dev + - name: ceph-csi-config + configMap: + name: ceph-csi-config diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index 849cf57b3e39..2adfa4893d18 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -86,6 +86,8 @@ spec: readOnly: true - name: host-dev mountPath: /dev + - name: ceph-csi-config + mountPath: /etc/ceph-csi-config/ volumes: - name: mount-cache-dir emptyDir: {} @@ -114,3 +116,6 @@ spec: - name: host-dev hostPath: path: /dev + - name: ceph-csi-config + configMap: + name: ceph-csi-config diff --git a/deploy/cephfs/kubernetes/csi-config-map.yaml b/deploy/cephfs/kubernetes/csi-config-map.yaml new file mode 100644 index 000000000000..3efb0c1be604 --- /dev/null +++ b/deploy/cephfs/kubernetes/csi-config-map.yaml @@ -0,0 +1,8 @@ +--- +apiVersion: v1 +kind: ConfigMap +data: + config.json: |- + [] +metadata: + name: ceph-csi-config diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index 7f0b679789e4..1bfc2ac93a22 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -31,11 +31,12 @@ make image-cephfsplugin Option | Default value | Description --------------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket -`--drivername` | `cephfs.csi.ceph.com` | name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) +`--drivername` | `cephfs.csi.ceph.com` | name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) `--nodeid` | _empty_ | This node's ID `--volumemounter` | _empty_ | default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed. -`--metadatastorage` | _empty_ | Whether metadata should be kept on node as file or in a k8s configmap (`node` or `k8s_configmap`) -`--mountcachedir` | _empty_ | volume mount cache info save dir. If left unspecified, the dirver will not record mount info, or it will save mount info and when driver restart it will remount volume it cached. +`--mountcachedir` | _empty_ | volume mount cache info save dir. If left unspecified, the dirver will not record mount info, or it will save mount info and when driver restart it will remount volume it cached. +`--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning +`--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) **Available environmental variables:** @@ -50,22 +51,28 @@ is used to define in which namespace you want the configmaps to be stored Parameter | Required | Description ----------------------------------------------------------------------------------------------------|--------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -`monitors` | yes | Comma separated list of Ceph monitors (e.g. `192.168.100.1:6789,192.168.100.2:6789,192.168.100.3:6789`) -`monValueFromSecret` | one of `monitors` and `monValueFromSecret` must be set | a string pointing the key in the credential secret, whose value is the mon. This is used for the case when the monitors' IP or hostnames are changed, the secret can be updated to pick up the new monitors. If both `monitors` and `monValueFromSecret` are set and the monitors set in the secret exists, `monValueFromSecret` takes precedence. +`clusterID` | yes | String representing a Ceph cluster, must be unique across all Ceph clusters in use for provisioning, cannot be greater than 36 bytes in length, and should remain immutable for the lifetime of the Ceph cluster in use +`fsName` | yes | CephFS filesystem name into which the volume shall be created `mounter` | no | Mount method to be used for this volume. Available options are `kernel` for Ceph kernel client and `fuse` for Ceph FUSE driver. Defaults to "default mounter", see command line arguments. -`provisionVolume` | yes | Mode of operation. BOOL value. If `true`, a new CephFS volume will be provisioned. If `false`, an existing volume will be used. -`pool` | for `provisionVolume=true` | Ceph pool into which the volume shall be created -`rootPath` | for `provisionVolume=false` | Root path of an existing CephFS volume -`csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value -`csi.storage.k8s.io/provisioner-secret-namespace`, `csi.storage.k8s.io/node-stage-secret-namespace` | for Kubernetes | namespaces of the above Secret objects +`pool` | yes | Ceph pool into which the volume shall be created +`csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value +`csi.storage.k8s.io/provisioner-secret-namespace`, `csi.storage.k8s.io/node-stage-secret-namespace` | for Kubernetes | Namespaces of the above Secret objects -**Required secrets for `provisionVolume=true`:** +**NOTE:** An accompanying CSI configuration file, needs to be provided to the +running pods. Refer to [Creating CSI configuration](../examples/README.md#creating-csi-configuration) +for more information. + +**NOTE:** A suggested way to populate and retain uniqueness of the clusterID is +to use the output of `ceph fsid` of the Ceph cluster to be used for +provisioning. + +**Required secrets for provisioning:** Admin credentials are required for provisioning new volumes * `adminID`: ID of an admin client * `adminKey`: key of the admin client -**Required secrets for `provisionVolume=false`:** +**Required secrets for statically provisioned volumes:** User credentials with access to an existing volume * `userID`: ID of a user client @@ -132,11 +139,14 @@ service/csi-cephfsplugin-provisioner ClusterIP 10.101.78.75 ... ``` -You can try deploying a demo pod from `examples/cephfs` to test the deployment further. +Once the CSI plugin configuration is updated with details from a Ceph cluster of +choice, you can try deploying a demo pod from examples/cephfs using the +instructions [provided](../examples/README.md#deploying-the-storage-class) to +test the deployment further. ### Notes on volume deletion -Volumes that were provisioned dynamically (i.e. `provisionVolume=true`) are -allowed to be deleted by the driver as well, if the user chooses to do -so.Otherwise, the driver is forbidden to delete such volumes - attempting to -delete them is a no-op. +Dynamically povisioned volumes are deleted by the driver, when requested to +do so. Statically provisioned volumes, from plugin versions less than or +equal to 1.0.0, are a no-op when a delete operation is performed against the +same, and are expected to be deleted on the Ceph cluster by the user. diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index 415213555436..6197551c8c7a 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -51,11 +51,10 @@ Parameter | Required | Description `mounter`| no | if set to `rbd-nbd`, use `rbd-nbd` on nodes that have `rbd-nbd` and `nbd` kernel modules to map rbd images **NOTE:** An accompanying CSI configuration file, needs to be provided to the -running pods. Refer to [Creating CSI configuration for RBD based -provisioning](../examples/README.md#creating-csi-configuration-for-rbd-based-provisioning) +running pods. Refer to [Creating CSI configuration](../examples/README.md#creating-csi-configuration) for more information. -**NOTE:** A suggested way to populate and retain uniquness of the clusterID is +**NOTE:** A suggested way to populate and retain uniqueness of the clusterID is to use the output of `ceph fsid` of the Ceph cluster to be used for provisioning. diff --git a/examples/README.md b/examples/README.md index c159dcddbd6c..cf11665dd6ef 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,13 +9,10 @@ By default, they look for the YAML manifests in `../../deploy/{rbd,cephfs}/kubernetes`. You can override this path by running `$ ./plugin-deploy.sh /path/to/my/manifests`. -## Creating CSI configuration for RBD based provisioning +## Creating CSI configuration -**NOTE:** This section is not required for cephfs based provisioning, and SHOULD -be skipped. - -For RBD based provisioning, the CSI plugin requires configuration information -regarding the Ceph cluster(s), that would host the RBD based block devices. This +The CSI plugin requires configuration information regarding the Ceph cluster(s), +that would host the dynamically or statically provisioned volumes. This is provided by adding a per-cluster identifier (referred to as clusterID), and the required monitor details for the same, as in the provided [sample config map](./rbd/csi-config-map-sample.yaml). @@ -31,12 +28,12 @@ Gather the following information from the Ceph cluster(s) of choice, * Alternatively, choose a `` value that is distinct per Ceph cluster in use by this kubernetes cluster -Update the [sample config map](./rbd/csi-config-map-sample.yaml) with values +Update the [sample config map](./csi-config-map-sample.yaml) with values from a Ceph cluster and replace `` with the chosen clusterID, to create the manifest for the config map which can be updated in the cluster using the following command, -* `kubectl replace -f rbd/csi-config-map-sample.yaml` +* `kubectl replace -f ./csi-config-map-sample.yaml` Storage class and snapshot class, using `` as the value for the option `clusterID`, can now be created on the cluster. diff --git a/examples/cephfs/plugin-deploy.sh b/examples/cephfs/plugin-deploy.sh index d678629b8375..b7799de3de4c 100755 --- a/examples/cephfs/plugin-deploy.sh +++ b/examples/cephfs/plugin-deploy.sh @@ -8,7 +8,7 @@ fi cd "$deployment_base" || exit 1 -objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-cephfsplugin-provisioner csi-cephfsplugin) +objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-config-map csi-cephfsplugin-provisioner csi-cephfsplugin) for obj in "${objects[@]}"; do kubectl create -f "./$obj.yaml" diff --git a/examples/cephfs/plugin-teardown.sh b/examples/cephfs/plugin-teardown.sh index ebe68072be8a..65eda0d49907 100755 --- a/examples/cephfs/plugin-teardown.sh +++ b/examples/cephfs/plugin-teardown.sh @@ -8,7 +8,7 @@ fi cd "$deployment_base" || exit 1 -objects=(csi-cephfsplugin-provisioner csi-cephfsplugin csi-provisioner-rbac csi-nodeplugin-rbac) +objects=(csi-cephfsplugin-provisioner csi-cephfsplugin csi-config-map csi-provisioner-rbac csi-nodeplugin-rbac) for obj in "${objects[@]}"; do kubectl delete -f "./$obj.yaml" diff --git a/examples/cephfs/secret.yaml b/examples/cephfs/secret.yaml index b96ec543ebca..f6472cebe0dc 100644 --- a/examples/cephfs/secret.yaml +++ b/examples/cephfs/secret.yaml @@ -5,10 +5,10 @@ metadata: name: csi-cephfs-secret namespace: default data: - # Required if provisionVolume is set to false + # Required for statically provisioned volumes userID: BASE64-ENCODED-VALUE userKey: BASE64-ENCODED-VALUE - # Required if provisionVolume is set to true + # Required for dynamically provisioned volumes adminID: BASE64-ENCODED-VALUE adminKey: BASE64-ENCODED-VALUE diff --git a/examples/cephfs/storageclass.yaml b/examples/cephfs/storageclass.yaml index fb4fd06ca5fd..8f27ea934dcb 100644 --- a/examples/cephfs/storageclass.yaml +++ b/examples/cephfs/storageclass.yaml @@ -5,27 +5,21 @@ metadata: name: csi-cephfs-sc provisioner: cephfs.csi.ceph.com parameters: - # Comma separated list of Ceph monitors - # if using FQDN, make sure csi plugin's dns policy is appropriate. - monitors: mon1:port,mon2:port,... + # String representing a Ceph cluster to provision storage from. + # Should be unique across all Ceph clusters in use for provisioning, + # cannot be greater than 36 bytes in length, and should remain immutable for + # the lifetime of the StorageClass in use. + # Ensure to create an entry in the config map named ceph-csi-config, based on + # csi-config-map-sample.yaml, to accompany the string chosen to + # represent the Ceph cluster in clusterID below + clusterID: - # For provisionVolume: "true": - # A new volume will be created along with a new Ceph user. - # Requires admin credentials (adminID, adminKey). - # For provisionVolume: "false": - # It is assumed the volume already exists and the user is expected - # to provide path to that volume (rootPath) and user credentials - # (userID, userKey). - provisionVolume: "true" + # CephFS filesystem name into which the volume shall be created + fsName: myfs # Ceph pool into which the volume shall be created - # Required for provisionVolume: "true" pool: cephfs_data - # Root path of an existing CephFS volume - # Required for provisionVolume: "false" - # rootPath: /absolute/path - # The secrets have to contain user and/or Ceph admin credentials. csi.storage.k8s.io/provisioner-secret-name: csi-cephfs-secret csi.storage.k8s.io/provisioner-secret-namespace: default diff --git a/examples/rbd/csi-config-map-sample.yaml b/examples/csi-config-map-sample.yaml similarity index 100% rename from examples/rbd/csi-config-map-sample.yaml rename to examples/csi-config-map-sample.yaml diff --git a/examples/rbd/plugin-deploy.sh b/examples/rbd/plugin-deploy.sh index 57398ee78370..06f853e48d15 100755 --- a/examples/rbd/plugin-deploy.sh +++ b/examples/rbd/plugin-deploy.sh @@ -8,7 +8,7 @@ fi cd "$deployment_base" || exit 1 -objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-rbdplugin-provisioner csi-rbdplugin) +objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-config-map csi-rbdplugin-provisioner csi-rbdplugin) for obj in "${objects[@]}"; do kubectl create -f "./$obj.yaml" diff --git a/examples/rbd/plugin-teardown.sh b/examples/rbd/plugin-teardown.sh index 2ee04be1ca36..e045aba1611b 100755 --- a/examples/rbd/plugin-teardown.sh +++ b/examples/rbd/plugin-teardown.sh @@ -8,7 +8,7 @@ fi cd "$deployment_base" || exit 1 -objects=(csi-rbdplugin-provisioner csi-rbdplugin csi-provisioner-rbac csi-nodeplugin-rbac) +objects=(csi-rbdplugin-provisioner csi-rbdplugin csi-config-map csi-provisioner-rbac csi-nodeplugin-rbac) for obj in "${objects[@]}"; do kubectl delete -f "./$obj.yaml" diff --git a/pkg/cephfs/cephfs_util.go b/pkg/cephfs/cephfs_util.go new file mode 100644 index 000000000000..134a791819f3 --- /dev/null +++ b/pkg/cephfs/cephfs_util.go @@ -0,0 +1,117 @@ +/* +Copyright 2019 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "fmt" + + "github.com/ceph/ceph-csi/pkg/util" +) + +// MDSMap is a representation of the mds map sub-structure returned by 'ceph fs get' +type MDSMap struct { + FilesystemName string `json:"fs_name"` +} + +// CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs get' +type CephFilesystemDetails struct { + ID int64 `json:"id"` + MDSMap MDSMap `json:"mdsmap"` +} + +func getFscID(monitors, id, key, fsName string) (int64, error) { + // ceph fs get myfs --format=json + // {"mdsmap":{...},"id":2} + var fsDetails CephFilesystemDetails + err := execCommandJSON(&fsDetails, + "ceph", + "-m", monitors, + "--id", id, + "--key="+key, + "-c", util.CephConfigPath, + "fs", "get", fsName, "--format=json", + ) + if err != nil { + return 0, err + } + + return fsDetails.ID, nil +} + +// CephFilesystem is a representation of the json structure returned by 'ceph fs ls' +type CephFilesystem struct { + Name string `json:"name"` + MetadataPool string `json:"metadata_pool"` + MetadataPoolID int `json:"metadata_pool_id"` + DataPools []string `json:"data_pools"` + DataPoolIDs []int `json:"data_pool_ids"` +} + +func getMetadataPool(monitors, id, key, fsName string) (string, error) { + // ./tbox ceph fs ls --format=json + // [{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":4,...},...] + var filesystems []CephFilesystem + err := execCommandJSON(&filesystems, + "ceph", + "-m", monitors, + "--id", id, + "--key="+key, + "-c", util.CephConfigPath, + "fs", "ls", "--format=json", + ) + if err != nil { + return "", err + } + + for _, fs := range filesystems { + if fs.Name == fsName { + return fs.MetadataPool, nil + } + } + + return "", fmt.Errorf("fsName (%s) not found in Ceph cluster", fsName) +} + +// CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs dump' +type CephFilesystemDump struct { + Filesystems []CephFilesystemDetails `json:"filesystems"` +} + +func getFsName(monitors, id, key string, fscID int64) (string, error) { + // ./tbox ceph fs dump --format=json + // JSON: {...,"filesystems":[{"mdsmap":{},"id":},...],...} + var fsDump CephFilesystemDump + err := execCommandJSON(&fsDump, + "ceph", + "-m", monitors, + "--id", id, + "--key="+key, + "-c", util.CephConfigPath, + "fs", "dump", "--format=json", + ) + if err != nil { + return "", err + } + + for _, fs := range fsDump.Filesystems { + if fs.ID == fscID { + return fs.MDSMap.FilesystemName, nil + } + } + + return "", fmt.Errorf("fscID (%d) not found in Ceph cluster", fscID) +} diff --git a/pkg/cephfs/cephuser.go b/pkg/cephfs/cephuser.go index aecc552f0b11..78f2340d7bf1 100644 --- a/pkg/cephfs/cephuser.go +++ b/pkg/cephfs/cephuser.go @@ -100,6 +100,7 @@ func createCephUser(volOptions *volumeOptions, adminCr *credentials, volID volum func deleteCephUser(volOptions *volumeOptions, adminCr *credentials, volID volumeID) error { adminID, userID := genUserIDs(adminCr, volID) + // TODO: Need to return success if userID is not found return execCommandErr("ceph", "-m", volOptions.Monitors, "-n", adminID, diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 4f3a5a1fb505..ff5e3b6f6012 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -41,10 +41,39 @@ type controllerCacheEntry struct { } var ( - mtxControllerVolumeID = keymutex.NewHashed(0) + mtxControllerVolumeID = keymutex.NewHashed(0) + mtxControllerVolumeName = keymutex.NewHashed(0) ) -// CreateVolume creates the volume in backend and store the volume metadata +// createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID, +// and on any error cleans up any created entities +func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error { + cr, err := getAdminCredentials(secret) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + + if err = createVolume(volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil { + klog.Errorf("failed to create volume %s: %v", volOptions.RequestName, err) + return status.Error(codes.Internal, err.Error()) + } + defer func() { + if err != nil { + if errDefer := purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil { + klog.Warningf("failed purging volume: %s (%s)", volOptions.RequestName, errDefer) + } + } + }() + + if _, err = createCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil { + klog.Errorf("failed to create ceph user for volume %s: %v", volOptions.RequestName, err) + return status.Error(codes.Internal, err.Error()) + } + + return nil +} + +// CreateVolume creates a reservation and the volume in backend, if it is not already present func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { klog.Errorf("CreateVolumeRequest validation failed: %v", err) @@ -52,67 +81,69 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } // Configuration - secret := req.GetSecrets() - volOptions, err := newVolumeOptions(req.GetParameters(), secret) + requestName := req.GetName() + volOptions, err := newVolumeOptions(requestName, req.GetCapacityRange().GetRequiredBytes(), + req.GetParameters(), secret) if err != nil { - klog.Errorf("validation of volume options failed: %v", err) + klog.Errorf("validation and extraction of volume options failed: %v", err) return nil, status.Error(codes.InvalidArgument, err.Error()) } - volID := makeVolumeID(req.GetName()) - - mtxControllerVolumeID.LockKey(string(volID)) - defer mustUnlock(mtxControllerVolumeID, string(volID)) + // Existence and conflict checks + mtxControllerVolumeName.LockKey(requestName) + defer mustUnlock(mtxControllerVolumeName, requestName) - // Create a volume in case the user didn't provide one + vID, err := checkVolExists(volOptions, secret) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if vID != nil { + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: vID.VolumeID, + CapacityBytes: volOptions.Size, + VolumeContext: req.GetParameters(), + }, + }, nil + } - if volOptions.ProvisionVolume { - // Admin credentials are required - cr, err := getAdminCredentials(secret) + // Reservation + vID, err = reserveVol(volOptions, secret) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer func() { if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - - if err = createVolume(volOptions, cr, volID, req.GetCapacityRange().GetRequiredBytes()); err != nil { - klog.Errorf("failed to create volume %s: %v", req.GetName(), err) - return nil, status.Error(codes.Internal, err.Error()) + errDefer := undoVolReservation(volOptions, *vID, secret) + if errDefer != nil { + klog.Warningf("failed undoing reservation of volume: %s (%s)", + requestName, errDefer) + } } + }() - if _, err = createCephUser(volOptions, cr, volID); err != nil { - klog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err) - return nil, status.Error(codes.Internal, err.Error()) - } - - klog.Infof("cephfs: successfully created volume %s", volID) - } else { - klog.Infof("cephfs: volume %s is provisioned statically", volID) + // Create a volume + err = cs.createBackingVolume(volOptions, vID, secret) + if err != nil { + return nil, err } - ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volID} - if err := cs.MetadataStore.Create(string(volID), ce); err != nil { - klog.Errorf("failed to store a cache entry for volume %s: %v", volID, err) - return nil, status.Error(codes.Internal, err.Error()) - } + klog.Infof("cephfs: successfully created backing volume named %s for request name %s", + vID.FsSubvolName, requestName) return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - VolumeId: string(volID), + VolumeId: vID.VolumeID, CapacityBytes: req.GetCapacityRange().GetRequiredBytes(), VolumeContext: req.GetParameters(), }, }, nil } -// DeleteVolume deletes the volume in backend -// and removes the volume metadata from store -// nolint: gocyclo -func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { - if err := cs.validateDeleteVolumeRequest(); err != nil { - klog.Errorf("DeleteVolumeRequest validation failed: %v", err) - return nil, err - } - +// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin, +// that have state information stored in files or kubernetes config maps +func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { var ( volID = volumeID(req.GetVolumeId()) secrets = req.GetSecrets() @@ -172,6 +203,65 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return &csi.DeleteVolumeResponse{}, nil } +// DeleteVolume deletes the volume in backend and its reservation +func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + if err := cs.validateDeleteVolumeRequest(); err != nil { + klog.Errorf("DeleteVolumeRequest validation failed: %v", err) + return nil, err + } + + volID := volumeID(req.GetVolumeId()) + secrets := req.GetSecrets() + + // Find the volume using the provided VolumeID + volOptions, vID, err := newVolumeOptionsFromVolID(string(volID), nil, secrets) + if err != nil { + // if error is ErrKeyNotFound, then a previous attempt at deletion was complete + // or partially complete (subvolume and imageOMap are garbage collected already), hence + // return success as deletion is complete + if _, ok := err.(util.ErrKeyNotFound); ok { + return &csi.DeleteVolumeResponse{}, nil + } + + // ErrInvalidVolID may mean this is an 1.0.0 version volume + if _, ok := err.(ErrInvalidVolID); ok && cs.MetadataStore != nil { + return cs.deleteVolumeDeprecated(req) + } + + return nil, status.Error(codes.Internal, err.Error()) + } + + // Deleting a volume requires admin credentials + cr, err := getAdminCredentials(secrets) + if err != nil { + klog.Errorf("failed to retrieve admin credentials: %v", err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + // lock out parallel delete and create requests against the same volume name as we + // cleanup the subvolume and associated omaps for the same + mtxControllerVolumeName.LockKey(volOptions.RequestName) + defer mustUnlock(mtxControllerVolumeName, volOptions.RequestName) + + if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil { + klog.Errorf("failed to delete volume %s: %v", volID, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + if err = deleteCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil { + klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + if err := undoVolReservation(volOptions, *vID, secrets); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + klog.Infof("cephfs: successfully deleted volume %s", volID) + + return &csi.DeleteVolumeResponse{}, nil +} + // ValidateVolumeCapabilities checks whether the volume capabilities requested // are supported. func (cs *ControllerServer) ValidateVolumeCapabilities( diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index 8017b02b355a..b61918f18d44 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -26,9 +26,17 @@ import ( ) const ( - // version of ceph driver version = "1.0.0" + + // volIDVersion is the version number of volume ID encoding scheme + volIDVersion uint16 = 1 + + // csiConfigFile is the location of the CSI config file + csiConfigFile = "/etc/ceph-csi-config/config.json" + + // RADOS namespace to store CSI specific objects and keys + radosNamespace = "csi" ) // PluginFolder defines the location of ceph plugin @@ -46,6 +54,14 @@ type Driver struct { var ( // DefaultVolumeMounter for mounting volumes DefaultVolumeMounter string + + // CSIInstanceID is the instance ID that is unique to an instance of CSI, used when sharing + // ceph clusters across CSI instances, to differentiate omap names per CSI instance + CSIInstanceID = "default" + + // volJournal is used to maintain RADOS based journals for CO generated + // VolumeName to backing CephFS subvolumes + volJournal *util.CSIJournal ) // NewDriver returns new ceph driver @@ -77,7 +93,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer { // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests -func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) { +func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir, instanceID string, cachePersister util.CachePersister) { klog.Infof("Driver: %v version: %v", driverName, version) // Configuration @@ -105,7 +121,21 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir klog.Fatalf("failed to write ceph configuration file: %v", err) } - initVolumeMountCache(driverName, mountCacheDir, cachePersister) + // Use passed in instance ID, if provided for omap suffix naming + if instanceID != "" { + CSIInstanceID = instanceID + } + // Get an instance of the volume journal + volJournal = util.NewCSIVolumeJournal() + + // Update keys with CSI instance suffix + volJournal.SetCSIDirectorySuffix(CSIInstanceID) + + // Update namespace for storing keys into a specific namespace on RADOS, in the CephFS + // metadata pool + volJournal.SetNamespace(radosNamespace) + + initVolumeMountCache(driverName, mountCacheDir) if mountCacheDir != "" { if err := remountCachedVolumes(); err != nil { klog.Warningf("failed to remount cached volumes: %v", err) diff --git a/pkg/cephfs/errors.go b/pkg/cephfs/errors.go new file mode 100644 index 000000000000..ab1d75c87a7c --- /dev/null +++ b/pkg/cephfs/errors.go @@ -0,0 +1,37 @@ +/* +Copyright 2019 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +// ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID +// formats +type ErrInvalidVolID struct { + err error +} + +func (e ErrInvalidVolID) Error() string { + return e.err.Error() +} + +// ErrNonStaticVolume is returned when a volume is detected as not being +// statically provisioned +type ErrNonStaticVolume struct { + err error +} + +func (e ErrNonStaticVolume) Error() string { + return e.err.Error() +} diff --git a/pkg/cephfs/fsjournal.go b/pkg/cephfs/fsjournal.go new file mode 100644 index 000000000000..587a2adc5348 --- /dev/null +++ b/pkg/cephfs/fsjournal.go @@ -0,0 +1,136 @@ +/* +Copyright 2019 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "github.com/ceph/ceph-csi/pkg/util" + + "k8s.io/klog" +) + +// volumeIdentifier structure contains an association between the CSI VolumeID to its subvolume +// name on the backing CephFS instance +type volumeIdentifier struct { + FsSubvolName string + VolumeID string +} + +/* +checkVolExists checks to determine if passed in RequestName in volOptions exists on the backend. + +**NOTE:** These functions manipulate the rados omaps that hold information regarding +volume names as requested by the CSI drivers. Hence, these need to be invoked only when the +respective CSI driver generated volume name based locks are held, as otherwise racy +access to these omaps may end up leaving them in an inconsistent state. + +These functions also cleanup omap reservations that are stale. I.e when omap entries exist and +backing subvolumes are missing, or one of the omaps exist and the next is missing. This is +because, the order of omap creation and deletion are inverse of each other, and protected by the +request name lock, and hence any stale omaps are leftovers from incomplete transactions and are +hence safe to garbage collect. +*/ +func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { + var ( + vi util.CSIIdentifier + vid volumeIdentifier + ) + + cr, err := getAdminCredentials(secret) + if err != nil { + return nil, err + } + + imageUUID, err := volJournal.CheckReservation(volOptions.Monitors, cr.id, cr.key, + volOptions.MetadataPool, volOptions.RequestName, "") + if err != nil { + return nil, err + } + if imageUUID == "" { + return nil, nil + } + vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID + + // TODO: size checks + + // found a volume already available, process and return it! + vi = util.CSIIdentifier{ + LocationID: volOptions.FscID, + EncodingVersion: volIDVersion, + ClusterID: volOptions.ClusterID, + ObjectUUID: imageUUID, + } + vid.VolumeID, err = vi.ComposeCSIID() + if err != nil { + return nil, err + } + + klog.V(4).Infof("Found existing volume (%s) with subvolume name (%s) for request (%s)", + vid.VolumeID, vid.FsSubvolName, volOptions.RequestName) + + return &vid, nil +} + +// undoVolReservation is a helper routine to undo a name reservation for a CSI VolumeName +func undoVolReservation(volOptions *volumeOptions, vid volumeIdentifier, secret map[string]string) error { + cr, err := getAdminCredentials(secret) + if err != nil { + return err + } + + err = volJournal.UndoReservation(volOptions.Monitors, cr.id, cr.key, volOptions.MetadataPool, + vid.FsSubvolName, volOptions.RequestName) + + return err +} + +// reserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and, +// to generate the volume identifier for the reserved UUID +func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { + var ( + vi util.CSIIdentifier + vid volumeIdentifier + ) + + cr, err := getAdminCredentials(secret) + if err != nil { + return nil, err + } + + imageUUID, err := volJournal.ReserveName(volOptions.Monitors, cr.id, cr.key, + volOptions.MetadataPool, volOptions.RequestName, "") + if err != nil { + return nil, err + } + vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID + + // generate the volume ID to return to the CO system + vi = util.CSIIdentifier{ + LocationID: volOptions.FscID, + EncodingVersion: volIDVersion, + ClusterID: volOptions.ClusterID, + ObjectUUID: imageUUID, + } + vid.VolumeID, err = vi.ComposeCSIID() + if err != nil { + return nil, err + } + + klog.V(4).Infof("Generated Volume ID (%s) and subvolume name (%s) for request name (%s)", + vid.VolumeID, vid.FsSubvolName, volOptions.RequestName) + + return &vid, nil +} diff --git a/pkg/cephfs/mountcache.go b/pkg/cephfs/mountcache.go index c8bc2a03b789..94234cb14753 100644 --- a/pkg/cephfs/mountcache.go +++ b/pkg/cephfs/mountcache.go @@ -16,6 +16,7 @@ type volumeMountCacheEntry struct { DriverVersion string `json:"driverVersion"` VolumeID string `json:"volumeID"` + Mounter string `json:"mounter"` Secrets map[string]string `json:"secrets"` StagingPath string `json:"stagingPath"` TargetPaths map[string]bool `json:"targetPaths"` @@ -25,7 +26,6 @@ type volumeMountCacheEntry struct { type volumeMountCacheMap struct { volumes map[string]volumeMountCacheEntry nodeCacheStore util.NodeCache - metadataStore util.CachePersister } var ( @@ -34,10 +34,9 @@ var ( volumeMountCacheMtx sync.Mutex ) -func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) { +func initVolumeMountCache(driverName string, mountCacheDir string) { volumeMountCache.volumes = make(map[string]volumeMountCacheEntry) - volumeMountCache.metadataStore = cachePersister volumeMountCache.nodeCacheStore.BasePath = mountCacheDir volumeMountCache.nodeCacheStore.CacheDir = driverName klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir) @@ -50,18 +49,19 @@ func remountCachedVolumes() error { } var remountFailCount, remountSuccCount int64 me := &volumeMountCacheEntry{} - ce := &controllerCacheEntry{} err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error { volID := me.VolumeID - if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil { - if err, ok := err.(*util.CacheEntryNotFound); ok { - klog.Infof("mount-cache: metadata not found, assuming the volume %s to be already deleted (%v)", volID, err) + if volOpts, vid, err := newVolumeOptionsFromVolID(me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil { + if err, ok := err.(util.ErrKeyNotFound); ok { + klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err) if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil { klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID) } } } else { - if err := mountOneCacheEntry(ce, me); err == nil { + // update Mounter from mount cache + volOpts.Mounter = me.Mounter + if err := mountOneCacheEntry(volOpts, vid, me); err == nil { remountSuccCount++ volumeMountCache.volumes[me.VolumeID] = *me klog.Infof("mount-cache: successfully remounted volume %s", volID) @@ -84,7 +84,7 @@ func remountCachedVolumes() error { return nil } -func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error { +func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *volumeMountCacheEntry) error { volumeMountCacheMtx.Lock() defer volumeMountCacheMtx.Unlock() @@ -92,17 +92,16 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err err error cr *credentials ) - volID := ce.VolumeID - volOptions := ce.VolOptions + volID := vid.VolumeID if volOptions.ProvisionVolume { - volOptions.RootPath = getVolumeRootPathCeph(volID) + volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName)) cr, err = getAdminCredentials(decodeCredentials(me.Secrets)) if err != nil { return err } var entity *cephEntity - entity, err = getCephUser(&volOptions, cr, volID) + entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName)) if err != nil { return err } @@ -127,12 +126,12 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err } if !isMnt { - m, err := newMounter(&volOptions) + m, err := newMounter(volOptions) if err != nil { klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err) return err } - if err := m.mount(me.StagingPath, cr, &volOptions); err != nil { + if err := m.mount(me.StagingPath, cr, volOptions); err != nil { klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err) return err } @@ -204,7 +203,7 @@ func (mc *volumeMountCacheMap) isEnable() bool { return mc.nodeCacheStore.BasePath != "" } -func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error { +func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter string, secrets map[string]string) error { if !mc.isEnable() { return nil } @@ -228,6 +227,7 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s me.Secrets = encodeCredentials(secrets) me.StagingPath = stagingTargetPath me.TargetPaths = lastTargetPaths + me.Mounter = mounter me.CreateTime = time.Now() volumeMountCache.volumes[volID] = me diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index af6c16ef4eb1..74f26ec7cc3e 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -80,6 +80,10 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi // NodeStageVolume mounts the volume to a staging path on the node. func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + var ( + volOptions *volumeOptions + vid *volumeIdentifier + ) if err := validateNodeStageVolumeRequest(req); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -89,15 +93,26 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol stagingTargetPath := req.GetStagingTargetPath() volID := volumeID(req.GetVolumeId()) - volOptions, err := newVolumeOptions(req.GetVolumeContext(), req.GetSecrets()) + volOptions, vid, err := newVolumeOptionsFromVolID(string(volID), req.GetVolumeContext(), req.GetSecrets()) if err != nil { - klog.Errorf("error reading volume options for volume %s: %v", volID, err) - return nil, status.Error(codes.InvalidArgument, err.Error()) - } + if _, ok := err.(ErrInvalidVolID); !ok { + return nil, status.Error(codes.Internal, err.Error()) + } - if volOptions.ProvisionVolume { - // Dynamically provisioned volumes don't have their root path set, do it here - volOptions.RootPath = getVolumeRootPathCeph(volID) + // check for pre-provisioned volumes (plugin versions > 1.0.0) + volOptions, vid, err = newVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext()) + if err != nil { + if _, ok := err.(ErrNonStaticVolume); !ok { + return nil, status.Error(codes.Internal, err.Error()) + } + + // check for volumes from plugin versions <= 1.0.0 + volOptions, vid, err = newVolumeOptionsFromVersion1Context(string(volID), req.GetVolumeContext(), + req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + } } if err = createMountPoint(stagingTargetPath); err != nil { @@ -123,7 +138,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } // It's not, mount now - if err = ns.mount(volOptions, req); err != nil { + if err = ns.mount(volOptions, req, vid); err != nil { return nil, err } @@ -132,11 +147,11 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return &csi.NodeStageVolumeResponse{}, nil } -func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error { +func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest, vid *volumeIdentifier) error { stagingTargetPath := req.GetStagingTargetPath() volID := volumeID(req.GetVolumeId()) - cr, err := getCredentialsForVolume(volOptions, volID, req) + cr, err := getCredentialsForVolume(volOptions, volumeID(vid.FsSubvolName), req) if err != nil { klog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err) return status.Error(codes.Internal, err.Error()) @@ -154,7 +169,7 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ klog.Errorf("failed to mount volume %s: %v", volID, err) return status.Error(codes.Internal, err.Error()) } - if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil { + if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil { klog.Warningf("mount-cache: failed to stage volume %s %s: %v", volID, stagingTargetPath, err) } return nil diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 62a7c14ade06..069122d9d837 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -42,10 +42,6 @@ func mustUnlock(m keymutex.KeyMutex, key string) { } } -func makeVolumeID(volName string) volumeID { - return volumeID("csi-cephfs-" + volName) -} - func execCommand(program string, args ...string) (stdout, stderr []byte, err error) { var ( cmd = exec.Command(program, args...) // nolint: gosec @@ -72,6 +68,7 @@ func execCommandErr(program string, args ...string) error { return err } +//nolint: unparam func execCommandJSON(v interface{}, program string, args ...string) error { stdout, _, err := execCommand(program, args...) if err != nil { diff --git a/pkg/cephfs/volumemounter.go b/pkg/cephfs/volumemounter.go index c441d8ed2737..0df0cd8ff1d4 100644 --- a/pkg/cephfs/volumemounter.go +++ b/pkg/cephfs/volumemounter.go @@ -114,7 +114,7 @@ func newMounter(volOptions *volumeOptions) (volumeMounter, error) { type fuseMounter struct{} func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions) error { - args := [...]string{ + args := []string{ mountPoint, "-m", volOptions.Monitors, "-c", util.CephConfigPath, @@ -123,6 +123,10 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions) er "-o", "nonempty", } + if volOptions.FsName != "" { + args = append(args, "--client_mds_namespace="+volOptions.FsName) + } + _, stderr, err := execCommand("ceph-fuse", args[:]...) if err != nil { return err @@ -166,12 +170,18 @@ func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions) return err } - return execCommandErr("mount", + args := []string{ "-t", "ceph", fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath), mountPoint, - "-o", fmt.Sprintf("name=%s,secret=%s", cr.id, cr.key), - ) + } + optionsStr := fmt.Sprintf("name=%s,secret=%s", cr.id, cr.key) + if volOptions.FsName != "" { + optionsStr = optionsStr + fmt.Sprintf(",mds_namespace=%s", volOptions.FsName) + } + args = append(args, "-o", optionsStr) + + return execCommandErr("mount", args[:]...) } func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions) error { diff --git a/pkg/cephfs/volumeoptions.go b/pkg/cephfs/volumeoptions.go index 49809fd1fc11..cc7b854569f8 100644 --- a/pkg/cephfs/volumeoptions.go +++ b/pkg/cephfs/volumeoptions.go @@ -19,17 +19,22 @@ package cephfs import ( "fmt" "strconv" + + "github.com/ceph/ceph-csi/pkg/util" ) type volumeOptions struct { - Monitors string `json:"monitors"` - Pool string `json:"pool"` - RootPath string `json:"rootPath"` - + RequestName string + Size int64 + ClusterID string + FsName string + FscID int64 + MetadataPool string + Monitors string `json:"monitors"` + Pool string `json:"pool"` + RootPath string `json:"rootPath"` Mounter string `json:"mounter"` ProvisionVolume bool `json:"provisionVolume"` - - MonValueFromSecret string `json:"monValueFromSecret"` } func validateNonEmptyField(field, fieldName string) error { @@ -40,35 +45,18 @@ func validateNonEmptyField(field, fieldName string) error { return nil } -func (o *volumeOptions) validate() error { - if err := validateNonEmptyField(o.Monitors, "monitors"); err != nil { - if err = validateNonEmptyField(o.MonValueFromSecret, "monValueFromSecret"); err != nil { - return err - } - } - - if err := validateNonEmptyField(o.RootPath, "rootPath"); err != nil { - if !o.ProvisionVolume { - return err - } - } else { - if o.ProvisionVolume { - return fmt.Errorf("non-empty field rootPath is in conflict with provisionVolume=true") - } - } - - if o.ProvisionVolume { - if err := validateNonEmptyField(o.Pool, "pool"); err != nil { - return err - } +func extractOptionalOption(dest *string, optionLabel string, options map[string]string) error { + opt, ok := options[optionLabel] + if !ok { + // Option not found, no error as it is optional + return nil } - if o.Mounter != "" { - if err := validateMounter(o.Mounter); err != nil { - return err - } + if err := validateNonEmptyField(opt, optionLabel); err != nil { + return err } + *dest = opt return nil } @@ -78,6 +66,10 @@ func extractOption(dest *string, optionLabel string, options map[string]string) return fmt.Errorf("missing required field %s", optionLabel) } + if err := validateNonEmptyField(opt, optionLabel); err != nil { + return err + } + *dest = opt return nil } @@ -93,63 +85,252 @@ func validateMounter(m string) error { return nil } -func newVolumeOptions(volOptions, secret map[string]string) (*volumeOptions, error) { +func extractMounter(dest *string, options map[string]string) error { + if err := extractOptionalOption(dest, "mounter", options); err != nil { + return err + } + + if *dest != "" { + if err := validateMounter(*dest); err != nil { + return err + } + } + + return nil +} + +func getMonsAndClusterID(options map[string]string) (string, string, error) { + clusterID, ok := options["clusterID"] + if !ok { + err := fmt.Errorf("clusterID must be set") + return "", "", err + } + + if err := validateNonEmptyField(clusterID, "clusterID"); err != nil { + return "", "", err + } + + monitors, err := util.Mons(csiConfigFile, clusterID) + if err != nil { + err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", clusterID) + return "", "", err + } + + return monitors, clusterID, err +} + +// newVolumeOptions generates a new instance of volumeOptions from the provided +// CSI request parameters +func newVolumeOptions(requestName string, size int64, volOptions, secret map[string]string) (*volumeOptions, error) { var ( opts volumeOptions err error ) - // extract mon from secret first - if err = extractOption(&opts.MonValueFromSecret, "monValueFromSecret", volOptions); err == nil { - mon := "" - if mon, err = getMonValFromSecret(secret); err == nil && len(mon) > 0 { - opts.Monitors = mon - } + opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(volOptions) + if err != nil { + return nil, err } - if len(opts.Monitors) == 0 { - // if not set in secret, get it from parameter - if err = extractOption(&opts.Monitors, "monitors", volOptions); err != nil { - return nil, fmt.Errorf("either monitors or monValueFromSecret should be set") - } + + if err = extractOption(&opts.Pool, "pool", volOptions); err != nil { + return nil, err + } + + if err = extractMounter(&opts.Mounter, volOptions); err != nil { + return nil, err + } + + if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil { + return nil, err } - if err = extractNewVolOpt(&opts, volOptions); err != nil { + opts.RequestName = requestName + opts.Size = size + + cr, err := getAdminCredentials(secret) + if err != nil { return nil, err } - if err = opts.validate(); err != nil { + opts.FscID, err = getFscID(opts.Monitors, cr.id, cr.key, opts.FsName) + if err != nil { return nil, err } + opts.MetadataPool, err = getMetadataPool(opts.Monitors, cr.id, cr.key, opts.FsName) + if err != nil { + return nil, err + } + + opts.ProvisionVolume = true + return &opts, nil } -func extractNewVolOpt(opts *volumeOptions, volOpt map[string]string) error { +// newVolumeOptionsFromVolID generates a new instance of volumeOptions and volumeIdentifier +// from the provided CSI VolumeID +func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) { var ( + vi util.CSIIdentifier + volOptions volumeOptions + vid volumeIdentifier + ) + + // Decode the VolID first, to detect older volumes or pre-provisioned volumes + // before other errors + err := vi.DecomposeCSIID(volID) + if err != nil { + err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID) + return nil, nil, ErrInvalidVolID{err} + } + volOptions.ClusterID = vi.ClusterID + vid.FsSubvolName = volJournal.NamingPrefix() + vi.ObjectUUID + vid.VolumeID = volID + volOptions.FscID = vi.LocationID + + if volOptions.Monitors, err = util.Mons(csiConfigFile, vi.ClusterID); err != nil { + err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", vi.ClusterID) + return nil, nil, err + } + + cr, err := getAdminCredentials(secrets) + if err != nil { + return nil, nil, err + } + + volOptions.FsName, err = getFsName(volOptions.Monitors, cr.id, cr.key, volOptions.FscID) + if err != nil { + return nil, nil, err + } + + volOptions.MetadataPool, err = getMetadataPool(volOptions.Monitors, cr.id, cr.key, + volOptions.FsName) + if err != nil { + return nil, nil, err + } + + volOptions.RequestName, _, err = volJournal.GetObjectUUIDData(volOptions.Monitors, cr.id, cr.key, + volOptions.MetadataPool, vi.ObjectUUID, false) + if err != nil { + return nil, nil, err + } + + if volOpt != nil { + if err = extractOption(&volOptions.Pool, "pool", volOpt); err != nil { + return nil, nil, err + } + + if err = extractMounter(&volOptions.Mounter, volOpt); err != nil { + return nil, nil, err + } + } + + volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName)) + volOptions.ProvisionVolume = true + + return &volOptions, &vid, nil +} + +// newVolumeOptionsFromVersion1Context generates a new instance of volumeOptions and +// volumeIdentifier from the provided CSI volume context, if the provided context was +// for a volume created by version 1.0.0 (or prior) of the CSI plugin +func newVolumeOptionsFromVersion1Context(volID string, options, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) { + var ( + opts volumeOptions + vid volumeIdentifier provisionVolumeBool string err error ) - if err = extractOption(&provisionVolumeBool, "provisionVolume", volOpt); err != nil { - return err + + // Check if monitors is part of the options, that is an indicator this is an 1.0.0 volume + if err = extractOption(&opts.Monitors, "monitors", options); err != nil { + return nil, nil, err + } + + // check if there are mon values in secret and if so override option retrieved monitors from + // monitors in the secret + mon, err := getMonValFromSecret(secrets) + if err == nil && len(mon) > 0 { + opts.Monitors = mon + } + + if err = extractOption(&provisionVolumeBool, "provisionVolume", options); err != nil { + return nil, nil, err } if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil { - return fmt.Errorf("failed to parse provisionVolume: %v", err) + return nil, nil, fmt.Errorf("failed to parse provisionVolume: %v", err) } if opts.ProvisionVolume { - if err = extractOption(&opts.Pool, "pool", volOpt); err != nil { - return err + if err = extractOption(&opts.Pool, "pool", options); err != nil { + return nil, nil, err } + + opts.RootPath = getVolumeRootPathCeph(volumeID(volID)) } else { - if err = extractOption(&opts.RootPath, "rootPath", volOpt); err != nil { - return err + if err = extractOption(&opts.RootPath, "rootPath", options); err != nil { + return nil, nil, err } } - // This field is optional, don't check for its presence - // nolint - // (skip errcheck and gosec as this is optional) - extractOption(&opts.Mounter, "mounter", volOpt) - return nil + if err = extractMounter(&opts.Mounter, options); err != nil { + return nil, nil, err + } + + vid.FsSubvolName = volID + vid.VolumeID = volID + + return &opts, &vid, nil +} + +// newVolumeOptionsFromStaticVolume generates a new instance of volumeOptions and +// volumeIdentifier from the provided CSI volume context, if the provided context is +// detected to be a statically provisioned volume +func newVolumeOptionsFromStaticVolume(volID string, options map[string]string) (*volumeOptions, *volumeIdentifier, error) { + var ( + opts volumeOptions + vid volumeIdentifier + staticVol bool + err error + ) + + val, ok := options["staticVolume"] + if !ok { + return nil, nil, ErrNonStaticVolume{err} + } + + if staticVol, err = strconv.ParseBool(val); err != nil { + return nil, nil, fmt.Errorf("failed to parse preProvisionedVolume: %v", err) + } + + if !staticVol { + return nil, nil, ErrNonStaticVolume{err} + } + + // Volume is static, and ProvisionVolume carries bool stating if it was provisioned, hence + // store NOT of static boolean + opts.ProvisionVolume = !staticVol + + opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(options) + if err != nil { + return nil, nil, err + } + + if err = extractOption(&opts.RootPath, "rootPath", options); err != nil { + return nil, nil, err + } + + if err = extractOption(&opts.FsName, "fsName", options); err != nil { + return nil, nil, err + } + + if err = extractMounter(&opts.Mounter, options); err != nil { + return nil, nil, err + } + + vid.FsSubvolName = opts.RootPath + vid.VolumeID = volID + + return &opts, &vid, nil } diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index 780250e3a3fa..f9f31bde84bc 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -306,7 +306,7 @@ func genSnapFromSnapID(rbdSnap *rbdSnapshot, snapshotID string, credentials map[ return err } - rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, rbdSnap.AdminID, key, vi.PoolID) + rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, rbdSnap.AdminID, key, vi.LocationID) if err != nil { return err } @@ -359,7 +359,7 @@ func genVolFromVolID(rbdVol *rbdVolume, volumeID string, credentials map[string] } rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, rbdVol.AdminID, key, - vi.PoolID) + vi.LocationID) if err != nil { return err } diff --git a/pkg/util/cephcmds.go b/pkg/util/cephcmds.go index 8cf675726173..9dabdb9036b4 100644 --- a/pkg/util/cephcmds.go +++ b/pkg/util/cephcmds.go @@ -123,17 +123,22 @@ func GetPoolName(monitors string, adminID string, key string, poolID int64) (str } // SetOMapKeyValue sets the given key and value into the provided Ceph omap name -func SetOMapKeyValue(monitors, adminID, key, poolName, oMapName, oMapKey, keyValue string) error { +func SetOMapKeyValue(monitors, adminID, key, poolName, namespace, oMapName, oMapKey, keyValue string) error { // Command: "rados setomapval oMapName oMapKey keyValue" - - _, _, err := ExecCommand( - "rados", + args := []string{ "-m", monitors, "--id", adminID, - "--key="+key, + "--key=" + key, "-c", CephConfigPath, "-p", poolName, - "setomapval", oMapName, oMapKey, keyValue) + "setomapval", oMapName, oMapKey, keyValue, + } + + if namespace != "" { + args = append(args, "--namespace="+namespace) + } + + _, _, err := ExecCommand("rados", args[:]...) if err != nil { klog.Errorf("failed adding key (%s with value %s), to omap (%s) in "+ "pool (%s): (%v)", oMapKey, keyValue, oMapName, poolName, err) @@ -144,7 +149,7 @@ func SetOMapKeyValue(monitors, adminID, key, poolName, oMapName, oMapKey, keyVal } // GetOMapValue gets the value for the given key from the named omap -func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (string, error) { +func GetOMapValue(monitors, adminID, key, poolName, namespace, oMapName, oMapKey string) (string, error) { // Command: "rados getomapval oMapName oMapKey " // No such key: replicapool/csi.volumes.directory.default/csi.volname tmpFile, err := ioutil.TempFile("", "omap-get-") @@ -155,14 +160,20 @@ func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (s defer tmpFile.Close() defer os.Remove(tmpFile.Name()) - stdout, stderr, err := ExecCommand( - "rados", + args := []string{ "-m", monitors, "--id", adminID, - "--key="+key, + "--key=" + key, "-c", CephConfigPath, "-p", poolName, - "getomapval", oMapName, oMapKey, tmpFile.Name()) + "getomapval", oMapName, oMapKey, tmpFile.Name(), + } + + if namespace != "" { + args = append(args, "--namespace="+namespace) + } + + stdout, stderr, err := ExecCommand("rados", args[:]...) if err != nil { // no logs, as attempting to check for non-existent key/value is done even on // regular call sequences @@ -189,17 +200,22 @@ func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (s } // RemoveOMapKey removes the omap key from the given omap name -func RemoveOMapKey(monitors, adminID, key, poolName, oMapName, oMapKey string) error { +func RemoveOMapKey(monitors, adminID, key, poolName, namespace, oMapName, oMapKey string) error { // Command: "rados rmomapkey oMapName oMapKey" - - _, _, err := ExecCommand( - "rados", + args := []string{ "-m", monitors, "--id", adminID, - "--key="+key, + "--key=" + key, "-c", CephConfigPath, "-p", poolName, - "rmomapkey", oMapName, oMapKey) + "rmomapkey", oMapName, oMapKey, + } + + if namespace != "" { + args = append(args, "--namespace="+namespace) + } + + _, _, err := ExecCommand("rados", args[:]...) if err != nil { // NOTE: Missing omap key removal does not return an error klog.Errorf("failed removing key (%s), from omap (%s) in "+ @@ -212,17 +228,22 @@ func RemoveOMapKey(monitors, adminID, key, poolName, oMapName, oMapKey string) e // CreateObject creates the object name passed in and returns ErrObjectExists if the provided object // is already present in rados -func CreateObject(monitors, adminID, key, poolName, objectName string) error { +func CreateObject(monitors, adminID, key, poolName, namespace, objectName string) error { // Command: "rados create objectName" - - stdout, _, err := ExecCommand( - "rados", + args := []string{ "-m", monitors, "--id", adminID, - "--key="+key, + "--key=" + key, "-c", CephConfigPath, "-p", poolName, - "create", objectName) + "create", objectName, + } + + if namespace != "" { + args = append(args, "--namespace="+namespace) + } + + stdout, _, err := ExecCommand("rados", args[:]...) if err != nil { klog.Errorf("failed creating omap (%s) in pool (%s): (%v)", objectName, poolName, err) if strings.Contains(string(stdout), "error creating "+poolName+"/"+objectName+ @@ -237,17 +258,22 @@ func CreateObject(monitors, adminID, key, poolName, objectName string) error { // RemoveObject removes the entire omap name passed in and returns ErrObjectNotFound is provided omap // is not found in rados -func RemoveObject(monitors, adminID, key, poolName, oMapName string) error { +func RemoveObject(monitors, adminID, key, poolName, namespace, oMapName string) error { // Command: "rados rm oMapName" - - stdout, _, err := ExecCommand( - "rados", + args := []string{ "-m", monitors, "--id", adminID, - "--key="+key, + "--key=" + key, "-c", CephConfigPath, "-p", poolName, - "rm", oMapName) + "rm", oMapName, + } + + if namespace != "" { + args = append(args, "--namespace="+namespace) + } + + stdout, _, err := ExecCommand("rados", args[:]...) if err != nil { klog.Errorf("failed removing omap (%s) in pool (%s): (%v)", oMapName, poolName, err) if strings.Contains(string(stdout), "error removing "+poolName+">"+oMapName+ diff --git a/pkg/util/util.go b/pkg/util/util.go index eeaaae638ea1..dca416c29600 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -102,7 +102,7 @@ func GenerateVolID(monitors, id, key, pool, clusterID, objUUID string, volIDVers // generate the volume ID to return to the CO system vi := CSIIdentifier{ - PoolID: poolID, + LocationID: poolID, EncodingVersion: volIDVersion, ClusterID: clusterID, ObjectUUID: objUUID, diff --git a/pkg/util/volid.go b/pkg/util/volid.go index 971c9b33c9f5..c523214a31ba 100644 --- a/pkg/util/volid.go +++ b/pkg/util/volid.go @@ -32,8 +32,8 @@ The CSI identifier is composed as elaborated in the comment against ComposeCSIID DecomposeCSIID is the inverse of the same function. The CSIIdentifier structure carries the following fields, -- PoolID: 64 bit integer of the pool that the volume belongs to, where the ID comes from Ceph pool - identifier for the corresponding pool name. +- LocationID: 64 bit integer identifier determining the location of the volume on the Ceph cluster. + It is the ID of the poolname or fsname, for RBD or CephFS backed volumes respectively. - EncodingVersion: Carries the version number of the encoding scheme used to encode the CSI ID, and is preserved for any future proofing w.r.t changes in the encoding scheme, and to retain ability to parse backward compatible encodings. @@ -43,7 +43,7 @@ The CSIIdentifier structure carries the following fields, corresponds to this CSI ID. */ type CSIIdentifier struct { - PoolID int64 // TODO: Name appropriately when reused for CephFS + LocationID int64 EncodingVersion uint16 ClusterID string ObjectUUID string @@ -87,7 +87,7 @@ func (ci CSIIdentifier) ComposeCSIID() (string, error) { binary.BigEndian.PutUint16(buf16, uint16(len(ci.ClusterID))) clusterIDLength := hex.EncodeToString(buf16) - binary.BigEndian.PutUint64(buf64, uint64(ci.PoolID)) + binary.BigEndian.PutUint64(buf64, uint64(ci.LocationID)) poolIDEncodedHex := hex.EncodeToString(buf64) return strings.Join([]string{versionEncodedHex, clusterIDLength, ci.ClusterID, @@ -136,7 +136,7 @@ func (ci *CSIIdentifier) DecomposeCSIID(composedCSIID string) (err error) { if err != nil { return err } - ci.PoolID = int64(binary.BigEndian.Uint64(buf64)) + ci.LocationID = int64(binary.BigEndian.Uint64(buf64)) // 16 for poolID encoding and 1 for '-' separator bytesToProcess -= 17 nextFieldStartIdx = nextFieldStartIdx + 17 diff --git a/pkg/util/volid_test.go b/pkg/util/volid_test.go index 04f4027ee946..a7053ec81b40 100644 --- a/pkg/util/volid_test.go +++ b/pkg/util/volid_test.go @@ -33,7 +33,7 @@ type testTuple struct { var testData = []testTuple{ { vID: CSIIdentifier{ - PoolID: 0xffff, + LocationID: 0xffff, EncodingVersion: 0xffff, ClusterID: "01616094-9d93-4178-bf45-c7eac19e8b15", ObjectUUID: "00000000-1111-2222-bbbb-cacacacacaca", diff --git a/pkg/util/voljournal.go b/pkg/util/voljournal.go index b312b419dc40..3f74f035f3f7 100644 --- a/pkg/util/voljournal.go +++ b/pkg/util/voljournal.go @@ -114,6 +114,9 @@ type CSIJournal struct { // volume name prefix for naming on Ceph rbd or FS, suffix is a uuid generated per volume namingPrefix string + + // namespace in which the RADOS objects are stored, default is no namespace + namespace string } // CSIVolumeJournal returns an instance of volume keys @@ -125,6 +128,7 @@ func NewCSIVolumeJournal() *CSIJournal { csiNameKey: "csi.volname", namingPrefix: "csi-vol-", cephSnapSourceKey: "", + namespace: "", } } @@ -137,6 +141,7 @@ func NewCSISnapshotJournal() *CSIJournal { csiNameKey: "csi.snapname", namingPrefix: "csi-snap-", cephSnapSourceKey: "csi.source", + namespace: "", } } @@ -150,6 +155,11 @@ func (cj *CSIJournal) SetCSIDirectorySuffix(suffix string) { cj.csiDirectory = cj.csiDirectory + "." + suffix } +// SetNamespace sets the namespace in which all RADOS objects would be created +func (cj *CSIJournal) SetNamespace(ns string) { + cj.namespace = ns +} + /* CheckReservation checks if given request name contains a valid reservation - If there is a valid reservation, then the corresponding UUID for the volume/snapshot is returned @@ -177,7 +187,7 @@ func (cj *CSIJournal) CheckReservation(monitors, id, key, pool, reqName, parentN } // check if request name is already part of the directory omap - objUUID, err := GetOMapValue(monitors, id, key, pool, cj.csiDirectory, + objUUID, err := GetOMapValue(monitors, id, key, pool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { // error should specifically be not found, for volume to be absent, any other error @@ -237,7 +247,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName // delete volume UUID omap (first, inverse of create order) // TODO: Check cases where volName can be empty, and we need to just cleanup the reqName imageUUID := strings.TrimPrefix(volName, cj.namingPrefix) - err := RemoveObject(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+imageUUID) + err := RemoveObject(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID) if err != nil { if _, ok := err.(ErrObjectNotFound); !ok { klog.Errorf("failed removing oMap %s (%s)", cj.cephUUIDDirectoryPrefix+imageUUID, err) @@ -246,7 +256,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName } // delete the request name key (last, inverse of create order) - err = RemoveOMapKey(monitors, id, key, pool, cj.csiDirectory, + err = RemoveOMapKey(monitors, id, key, pool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { klog.Errorf("failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err) @@ -259,7 +269,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName // reserveOMapName creates an omap with passed in oMapNamePrefix and a generated . // It ensures generated omap name does not already exist and if conflicts are detected, a set // number of retires with newer uuids are attempted before returning an error -func reserveOMapName(monitors, id, key, pool, oMapNamePrefix string) (string, error) { +func reserveOMapName(monitors, id, key, pool, namespace, oMapNamePrefix string) (string, error) { var iterUUID string maxAttempts := 5 @@ -268,7 +278,7 @@ func reserveOMapName(monitors, id, key, pool, oMapNamePrefix string) (string, er // generate a uuid for the image name iterUUID = uuid.NewUUID().String() - err := CreateObject(monitors, id, key, pool, oMapNamePrefix+iterUUID) + err := CreateObject(monitors, id, key, pool, namespace, oMapNamePrefix+iterUUID) if err != nil { if _, ok := err.(ErrObjectExists); ok { attempt++ @@ -318,15 +328,15 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s // NOTE: If any service loss occurs post creation of the UUID directory, and before // setting the request name key (csiNameKey) to point back to the UUID directory, the // UUID directory key will be leaked - volUUID, err := reserveOMapName(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix) + volUUID, err := reserveOMapName(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix) if err != nil { return "", err } // Create request name (csiNameKey) key in csiDirectory and store the UUId based // volume name into it - err = SetOMapKeyValue(monitors, id, key, pool, cj.csiDirectory, cj.csiNameKeyPrefix+reqName, - volUUID) + err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.csiDirectory, + cj.csiNameKeyPrefix+reqName, volUUID) if err != nil { return "", err } @@ -342,7 +352,7 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s }() // Update UUID directory to store CSI request name - err = SetOMapKeyValue(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+volUUID, + err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.csiNameKey, reqName) if err != nil { return "", err @@ -350,7 +360,7 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s if snapSource { // Update UUID directory to store source volume UUID in case of snapshots - err = SetOMapKeyValue(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+volUUID, + err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.cephSnapSourceKey, parentName) if err != nil { return "", err @@ -376,14 +386,14 @@ func (cj *CSIJournal) GetObjectUUIDData(monitors, id, key, pool, objectUUID stri } // TODO: fetch all omap vals in one call, than make multiple listomapvals - requestName, err := GetOMapValue(monitors, id, key, pool, + requestName, err := GetOMapValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey) if err != nil { return "", "", err } if snapSource { - sourceName, err = GetOMapValue(monitors, id, key, pool, + sourceName, err = GetOMapValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey) if err != nil { return "", "", err