diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index 0d43b1671109..64b74e2abd2d 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -27,6 +27,11 @@ serviceAccounts: # - "" # rbd: # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Configuration details of clusterID,PoolID and FscID mapping diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index dac23d21e44a..9aa43c547c4a 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -88,6 +88,8 @@ func init() { "", "list of Kubernetes node labels, that determines the"+ " CRUSH location the node belongs to, separated by ','") + flag.StringVar(&conf.ContainerOrchestrator, "container-orchestrator", "kubernetes", + "container orchestrator [kubernetes]") // cephfs related flags flag.BoolVar( diff --git a/deploy/csi-config-map-sample.yaml b/deploy/csi-config-map-sample.yaml index b48e834a56f6..6397bbc84855 100644 --- a/deploy/csi-config-map-sample.yaml +++ b/deploy/csi-config-map-sample.yaml @@ -66,6 +66,15 @@ data: } "nfs": { "netNamespaceFilePath": "/plugins/nfs.csi.ceph.com/net", + }, + "readAffinity": { + "enabled": "false", + "crushLocationLabels": [ + "", + "" + ... + "" + ] } } ] diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index d542fe3035df..2ff23f97ac37 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -48,8 +48,7 @@ make image-cephcsi | `--skipforceflatten` | `false` | skip image flattening on kernel < 5.2 which support mapping of rbd images which has the deep-flatten feature | | `--maxsnapshotsonimage` | `450` | Maximum number of snapshots allowed on rbd image without flattening | | `--setmetadata` | `false` | Set metadata on volume | -| `--enable-read-affinity` | `false` | enable read affinity | -| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ',' | +| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.
`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` | **Available volume parameters:** @@ -215,7 +214,7 @@ for more details. This can be enabled by adding labels to Kubernetes nodes like `"topology.io/region=east"` and `"topology.io/zone=east-zone1"` and -passing command line arguments `"--enable-read-affinity=true"` and +passing command line argument `"--crush-location-labels=topology.io/zone,topology.io/region"` to Ceph CSI RBD daemonset pod "csi-rbdplugin" container, resulting in Ceph CSI adding `"--options read_from_replica=localize,crush_location=zone:east-zone1|region:east"` @@ -224,6 +223,12 @@ If enabled, this option will be added to all RBD volumes mapped by Ceph CSI. Well known labels can be found [here](https://kubernetes.io/docs/reference/labels-annotations-taints/). +Read affinity can be configured for individual clusters within the +`ceph-csi-config` ConfigMap. This allows configuring the crush location labels +for each ceph cluster separately. The crush location labels specified in the +ConfigMap will supersede those provided via command line argument +`--crush-location-labels`. + >Note: Label values will have all its dots `"."` normalized with dashes `"-"` in order for it to work with ceph CRUSH map. diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 950a063afce4..112dcdbc2794 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -26,6 +26,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" @@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer { func NewNodeServer( d *csicommon.CSIDriver, t string, - topology map[string]string, - crushLocationMap map[string]string, + nodeLabels, topology, crushLocationMap map[string]string, ) (*rbd.NodeServer, error) { ns := rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), - VolumeLocks: util.NewVolumeLocks(), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + VolumeLocks: util.NewVolumeLocks(), + NodeLabels: nodeLabels, + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), } - ns.SetReadAffinityMapOptions(crushLocationMap) return &ns, nil } @@ -87,8 +88,8 @@ func NewNodeServer( // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { var ( - err error - topology, crushLocationMap map[string]string + err error + nodeLabels, topology, crushLocationMap map[string]string ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) @@ -125,8 +126,15 @@ func (r *Driver) Run(conf *util.Config) { }) } + if conf.ContainerOrchestrator == "kubernetes" { + nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + } + if conf.EnableReadAffinity { - crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels) if err != nil { log.FatalLogMsg(err.Error()) } @@ -140,7 +148,7 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) + r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 25f51b35c125..96e5ed84aa4d 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,8 +45,10 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks - // readAffinityMapOptions contains map options to enable read affinity. - readAffinityMapOptions string + // NodeLabels stores the node labels + NodeLabels map[string]string + // CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity. + CLIReadAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed @@ -258,11 +260,10 @@ func (ns *NodeServer) populateRbdVol( rv.Mounter = rbdNbdMounter } - err = getMapOptions(req, rv) + err = ns.getMapOptions(req, rv) if err != nil { return nil, err } - ns.appendReadAffinityMapOptions(rv) rv.VolID = volID @@ -280,14 +281,14 @@ func (ns *NodeServer) populateRbdVol( // appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions // if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty. -func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) { +func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) { switch { - case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: + case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: return case rv.MapOptions != "": - rv.MapOptions += "," + ns.readAffinityMapOptions + rv.MapOptions += "," + readAffinityMapOptions default: - rv.MapOptions = ns.readAffinityMapOptions + rv.MapOptions = readAffinityMapOptions } } @@ -1395,22 +1396,3 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) { return size, nil } - -func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) { - if len(crushLocationMap) == 0 { - return - } - - var b strings.Builder - b.WriteString("read_from_replica=localize,crush_location=") - first := true - for key, val := range crushLocationMap { - if first { - b.WriteString(fmt.Sprintf("%s:%s", key, val)) - first = false - } else { - b.WriteString(fmt.Sprintf("|%s:%s", key, val)) - } - } - ns.readAffinityMapOptions = b.String() -} diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 822232c6fac9..36ce8c292048 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -20,6 +20,8 @@ import ( "context" "testing" + "github.com/ceph/ceph-csi/internal/util" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/stretchr/testify/assert" ) @@ -107,7 +109,7 @@ func TestParseBoolOption(t *testing.T) { } } -func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { +func TestNodeServer_ConstructReadAffinityMapOption(t *testing.T) { t.Parallel() tests := []struct { name string @@ -147,9 +149,10 @@ func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { currentTT := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - ns := &NodeServer{} - ns.SetReadAffinityMapOptions(currentTT.crushLocationmap) - assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions) + ns := &NodeServer{ + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(currentTT.crushLocationmap), + } + assert.Contains(t, currentTT.wantAny, ns.CLIReadAffinityMapOptions) }) } } @@ -236,10 +239,7 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { MapOptions: currentTT.args.mapOptions, Mounter: currentTT.args.mounter, } - ns := &NodeServer{ - readAffinityMapOptions: currentTT.args.readAffinityMapOptions, - } - ns.appendReadAffinityMapOptions(rv) + rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions) assert.Equal(t, currentTT.want, rv.MapOptions) }) } diff --git a/internal/rbd/rbd_attach.go b/internal/rbd/rbd_attach.go index c8d4701e29f6..37d121d79b66 100644 --- a/internal/rbd/rbd_attach.go +++ b/internal/rbd/rbd_attach.go @@ -295,7 +295,7 @@ func parseMapOptions(mapOptions string) (string, string, error) { // getMapOptions is a wrapper func, calls parse map/unmap funcs and feeds the // rbdVolume object. -func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { +func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { krbdMapOptions, nbdMapOptions, err := parseMapOptions(req.GetVolumeContext()["mapOptions"]) if err != nil { return err @@ -312,6 +312,14 @@ func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { rv.UnmapOptions = nbdUnmapOptions } + readAffinityMapOptions, err := util.GetReadAffinityMapOptions( + rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels, + ) + if err != nil { + return err + } + rv.appendReadAffinityMapOptions(readAffinityMapOptions) + return nil } diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go index 5f3751c339d5..4b7a70496b0d 100644 --- a/internal/util/crushlocation.go +++ b/internal/util/crushlocation.go @@ -23,19 +23,14 @@ import ( ) // GetCrushLocationMap returns the crush location map, determined from -// the crush location labels and their values from the CO system. +// the crush location labels and their values from the node labels passed in arg. // Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. // Returns map of crush location types with its array of associated values. -func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { +func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) (map[string]string, error) { if crushLocationLabels == "" { return nil, nil } - nodeLabels, err := k8sGetNodeLabels(nodeName) - if err != nil { - return nil, err - } - return getCrushLocationMap(crushLocationLabels, nodeLabels), nil } diff --git a/internal/util/csiconfig.go b/internal/util/csiconfig.go index 48a2e09c1fe4..aad750db3fd3 100644 --- a/internal/util/csiconfig.go +++ b/internal/util/csiconfig.go @@ -64,6 +64,11 @@ type ClusterInfo struct { // symlink filepath for the network namespace where we need to execute commands. NetNamespaceFilePath string `json:"netNamespaceFilePath"` } `json:"nfs"` + // Read affinity map options + ReadAffinity struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + } `json:"readAffinity"` } // Expected JSON structure in the passed in config file is, @@ -209,3 +214,20 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) return cluster.NFS.NetNamespaceFilePath, nil } + +// GetCrushLocationLabels returns the crushLocationLabels from csi config for the given clusterID +// If `readAffinity.enabled` is set to true, else returns an empty string. +func GetCrushLocationLabels(pathToConfig, clusterID string) (bool, string, error) { + cluster, err := readClusterInfo(pathToConfig, clusterID) + if err != nil { + return false, "", err + } + + if !cluster.ReadAffinity.Enabled { + return false, "", nil + } + + crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",") + + return true, crushLocationLabels, nil +} diff --git a/internal/util/csiconfig_test.go b/internal/util/csiconfig_test.go index 66b5c927d982..fb8cd8557c7d 100644 --- a/internal/util/csiconfig_test.go +++ b/internal/util/csiconfig_test.go @@ -365,3 +365,101 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) { }) } } + +func TestGetReadAffinityOptions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + clusterID string + want string + }{ + { + name: "ReadAffinity enabled set to true for cluster-1", + clusterID: "cluster-1", + want: "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack", + }, + { + name: "ReadAffinity enabled set to true for cluster-2", + clusterID: "cluster-2", + want: "topology.kubernetes.io/region", + }, + { + name: "ReadAffinity enabled set to false for cluster-3", + clusterID: "cluster-3", + want: "", + }, + { + name: "ReadAffinity option not set in cluster-4", + clusterID: "cluster-4", + want: "", + }, + } + + csiConfig := []ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + "topology.kubernetes.io/zone", + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-4", + }, + } + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := t.TempDir() + "/ceph-csi.json" + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", CsiConfigFile, err) + } + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + _, got, err := GetCrushLocationLabels(tmpConfPath, tc.clusterID) + if err != nil { + t.Errorf("GetCrushLocationLabels() error = %v", err) + + return + } + if got != tc.want { + t.Errorf("GetCrushLocationLabels() = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/internal/util/k8s/client.go b/internal/util/k8s/client.go index 684fd7090308..0665a5e43b8a 100644 --- a/internal/util/k8s/client.go +++ b/internal/util/k8s/client.go @@ -25,8 +25,14 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +var kubeclient *kubernetes.Clientset + // NewK8sClient create kubernetes client. func NewK8sClient() (*kubernetes.Clientset, error) { + if kubeclient != nil { + return kubeclient, nil + } + var cfg *rest.Config var err error cPath := os.Getenv("KUBERNETES_CONFIG_PATH") @@ -46,5 +52,9 @@ func NewK8sClient() (*kubernetes.Clientset, error) { return nil, fmt.Errorf("failed to create client: %w", err) } + if kubeclient == nil { + kubeclient = client + } + return client, nil } diff --git a/internal/util/k8s/node.go b/internal/util/k8s/node.go new file mode 100644 index 000000000000..ad7760cd8803 --- /dev/null +++ b/internal/util/k8s/node.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 The CephCSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func GetNodeLabels(nodeName string) (map[string]string, error) { + client, err := NewK8sClient() + if err != nil { + return nil, fmt.Errorf("can not get node %q information, failed "+ + "to connect to Kubernetes: %w", nodeName, err) + } + + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) + } + + return node.GetLabels(), nil +} diff --git a/internal/util/read_affinity.go b/internal/util/read_affinity.go new file mode 100644 index 000000000000..de2a23987d5a --- /dev/null +++ b/internal/util/read_affinity.go @@ -0,0 +1,82 @@ +// /* +// Copyright 2023 The Ceph-CSI Authors. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package util + +import ( + "fmt" + "strings" + + "github.com/ceph/ceph-csi/internal/util/log" +) + +// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap. +// It appends crush location labels in the format +// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...". +func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string { + if len(crushLocationMap) == 0 { + return "" + } + + var b strings.Builder + b.WriteString("read_from_replica=localize,crush_location=") + first := true + for key, val := range crushLocationMap { + if first { + b.WriteString(fmt.Sprintf("%s:%s", key, val)) + first = false + } else { + b.WriteString(fmt.Sprintf("|%s:%s", key, val)) + } + } + + return b.String() +} + +// GetReadAffinityMapOptions retrieves the readAffinityMapOptions from the CSI config file if it exists. +// If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line. +// If neither of these options is available, it returns an empty string. +func GetReadAffinityMapOptions( + clusterID, cliReadAffinityMapOptions string, nodeLabels map[string]string, +) (string, error) { + var ( + err error + configReadAffinityEnabled bool + configCrushLocationLabels string + ) + + configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(CsiConfigFile, clusterID) + if err != nil { + return "", err + } + + if !configReadAffinityEnabled { + return "", nil + } + + if configCrushLocationLabels == "" { + return cliReadAffinityMapOptions, nil + } + + crushLocationMap, err := GetCrushLocationMap(configCrushLocationLabels, nodeLabels) + if err != nil { + log.FatalLogMsg(err.Error()) + } + + readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap) + + return readAffinityMapOptions, nil +} diff --git a/internal/util/topology.go b/internal/util/topology.go index be99dbbe14a0..1f08ca6ff049 100644 --- a/internal/util/topology.go +++ b/internal/util/topology.go @@ -17,16 +17,14 @@ limitations under the License. package util import ( - "context" "encoding/json" "fmt" "strings" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" - - "github.com/container-storage-interface/spec/lib/go/csi" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -34,21 +32,6 @@ const ( labelSeparator string = "," ) -func k8sGetNodeLabels(nodeName string) (map[string]string, error) { - client, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("can not get node %q information, failed "+ - "to connect to Kubernetes: %w", nodeName, err) - } - - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) - } - - return node.GetLabels(), nil -} - // GetTopologyFromDomainLabels returns the CSI topology map, determined from // the domain labels and their values from the CO system // Expects domainLabels in arg to be in the format "[prefix/],[prefix/],...",. @@ -82,7 +65,7 @@ func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map labelCount++ } - nodeLabels, err := k8sGetNodeLabels(nodeName) + nodeLabels, err := k8s.GetNodeLabels(nodeName) if err != nil { return nil, err } diff --git a/internal/util/util.go b/internal/util/util.go index 659b4032080e..c5832ab09eb1 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -151,6 +151,9 @@ type Config struct { // Read affinity related options EnableReadAffinity bool // enable OSD read affinity. CrushLocationLabels string // list of CRUSH location labels to read from the node. + + // ContainerOrchestrator represents the choice of container orchestration system to be used. + ContainerOrchestrator string } // ValidateDriverName validates the driver name.