From 3e7df1bca6f36eaf2a9421cc0f3330ed097f900a Mon Sep 17 00:00:00 2001 From: Praveen M Date: Fri, 6 Oct 2023 20:11:49 +0530 Subject: [PATCH] deploy: support for read affinity options per cluster Implemented the capability to include read affinity options for individual clusters within the ceph-csi-config ConfigMap. This allows users to configure the crush location for each cluster separately. The read affinity options specified in the ConfigMap will supersede those provided via command line arguments. Signed-off-by: Praveen M --- charts/ceph-csi-cephfs/values.yaml | 5 ++ charts/ceph-csi-rbd/values.yaml | 5 ++ deploy/csi-config-map-sample.yaml | 9 +++ internal/rbd/driver/driver.go | 24 +++++--- internal/rbd/nodeserver.go | 58 +++++++++++++++--- internal/rbd/nodeserver_test.go | 14 ++--- internal/util/crushlocation.go | 9 +-- internal/util/csiconfig.go | 26 ++++++++ internal/util/csiconfig_test.go | 98 ++++++++++++++++++++++++++++++ internal/util/k8s/node.go | 23 +++++++ internal/util/topology.go | 20 +----- 11 files changed, 238 insertions(+), 53 deletions(-) create mode 100644 internal/util/k8s/node.go diff --git a/charts/ceph-csi-cephfs/values.yaml b/charts/ceph-csi-cephfs/values.yaml index 05bf0beab5b0..b24c6b2e128a 100644 --- a/charts/ceph-csi-cephfs/values.yaml +++ b/charts/ceph-csi-cephfs/values.yaml @@ -28,6 +28,11 @@ serviceAccounts: # cephFS: # subvolumeGroup: "csi" # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Labels to apply to all resources diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index 0d43b1671109..64b74e2abd2d 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -27,6 +27,11 @@ serviceAccounts: # - "" # rbd: # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Configuration details of clusterID,PoolID and FscID mapping diff --git a/deploy/csi-config-map-sample.yaml b/deploy/csi-config-map-sample.yaml index b48e834a56f6..6397bbc84855 100644 --- a/deploy/csi-config-map-sample.yaml +++ b/deploy/csi-config-map-sample.yaml @@ -66,6 +66,15 @@ data: } "nfs": { "netNamespaceFilePath": "/plugins/nfs.csi.ceph.com/net", + }, + "readAffinity": { + "enabled": "false", + "crushLocationLabels": [ + "", + "" + ... + "" + ] } } ] diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 950a063afce4..6f0fcc0ad080 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -26,6 +26,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" @@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer { func NewNodeServer( d *csicommon.CSIDriver, t string, - topology map[string]string, - crushLocationMap map[string]string, + nodeLabels, topology, crushLocationMap map[string]string, ) (*rbd.NodeServer, error) { ns := rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), - VolumeLocks: util.NewVolumeLocks(), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + VolumeLocks: util.NewVolumeLocks(), + NodeLabels: nodeLabels, + CMDReadAffinityMapOptions: rbd.ConstructReadAffinityMapOption(crushLocationMap), } - ns.SetReadAffinityMapOptions(crushLocationMap) return &ns, nil } @@ -87,8 +88,8 @@ func NewNodeServer( // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { var ( - err error - topology, crushLocationMap map[string]string + err error + nodeLabels, topology, crushLocationMap map[string]string ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) @@ -125,8 +126,13 @@ func (r *Driver) Run(conf *util.Config) { }) } + nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + if conf.EnableReadAffinity { - crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels) if err != nil { log.FatalLogMsg(err.Error()) } @@ -140,7 +146,7 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) + r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 25f51b35c125..df6ab4b9f830 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,8 +45,10 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks - // readAffinityMapOptions contains map options to enable read affinity. - readAffinityMapOptions string + // NodeLabels stores the node labels + NodeLabels map[string]string + // cmdreadAffinityMapOptions contains map options passed through command line to enable read affinity. + CMDReadAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed @@ -262,7 +264,9 @@ func (ns *NodeServer) populateRbdVol( if err != nil { return nil, err } - ns.appendReadAffinityMapOptions(rv) + + readAffinityMapOptions := ns.getReadAffinityOptions(rv.ClusterID) + rv.appendReadAffinityMapOptions(readAffinityMapOptions) rv.VolID = volID @@ -280,14 +284,14 @@ func (ns *NodeServer) populateRbdVol( // appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions // if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty. -func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) { +func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) { switch { - case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: + case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: return case rv.MapOptions != "": - rv.MapOptions += "," + ns.readAffinityMapOptions + rv.MapOptions += "," + readAffinityMapOptions default: - rv.MapOptions = ns.readAffinityMapOptions + rv.MapOptions = readAffinityMapOptions } } @@ -1396,9 +1400,42 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) { return size, nil } -func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) { +// getReadAffinityMapOptionFromConfigMap retrieves read affinity map options for cluster `clusterID`. +func (ns *NodeServer) getReadAffinityMapOptionFromConfigMap(clusterID string) string { + crushLocationLabels, err := util.GetReadAffinityOptions(util.CsiConfigFile, clusterID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + + crushLocationMap, err := util.GetCrushLocationMap(crushLocationLabels, ns.NodeLabels) + if err != nil { + log.FatalLogMsg(err.Error()) + } + + readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap) + + return readAffinityMapOptions +} + +// getReadAffinityOptions retrieves the `readAffinityMapOptions` from the CSI config file if it exists. +// If not, it falls back to returning the `readAffinityMapOptions` set in the `cmdReadAffinityMapOptions` +// from the command line. If neither of these options is available, it returns an empty string. +func (ns *NodeServer) getReadAffinityOptions(clusterID string) string { + var readAffinityMapOptions string + readAffinityMapOptions = ns.getReadAffinityMapOptionFromConfigMap(clusterID) + if readAffinityMapOptions == "" { + readAffinityMapOptions = ns.CMDReadAffinityMapOptions + } + + return readAffinityMapOptions +} + +// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap. +// It appends crush location labels in the format +// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...". +func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string { if len(crushLocationMap) == 0 { - return + return "" } var b strings.Builder @@ -1412,5 +1449,6 @@ func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]stri b.WriteString(fmt.Sprintf("|%s:%s", key, val)) } } - ns.readAffinityMapOptions = b.String() + + return b.String() } diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 822232c6fac9..fe5dc1646b35 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -107,7 +107,7 @@ func TestParseBoolOption(t *testing.T) { } } -func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { +func TestNodeServer_SetcmdReadAffinityMapOptions(t *testing.T) { t.Parallel() tests := []struct { name string @@ -147,9 +147,10 @@ func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { currentTT := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - ns := &NodeServer{} - ns.SetReadAffinityMapOptions(currentTT.crushLocationmap) - assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions) + ns := &NodeServer{ + CMDReadAffinityMapOptions: ConstructReadAffinityMapOption(currentTT.crushLocationmap), + } + assert.Contains(t, currentTT.wantAny, ns.CMDReadAffinityMapOptions) }) } } @@ -236,10 +237,7 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { MapOptions: currentTT.args.mapOptions, Mounter: currentTT.args.mounter, } - ns := &NodeServer{ - readAffinityMapOptions: currentTT.args.readAffinityMapOptions, - } - ns.appendReadAffinityMapOptions(rv) + rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions) assert.Equal(t, currentTT.want, rv.MapOptions) }) } diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go index 5f3751c339d5..4b7a70496b0d 100644 --- a/internal/util/crushlocation.go +++ b/internal/util/crushlocation.go @@ -23,19 +23,14 @@ import ( ) // GetCrushLocationMap returns the crush location map, determined from -// the crush location labels and their values from the CO system. +// the crush location labels and their values from the node labels passed in arg. // Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. // Returns map of crush location types with its array of associated values. -func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { +func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) (map[string]string, error) { if crushLocationLabels == "" { return nil, nil } - nodeLabels, err := k8sGetNodeLabels(nodeName) - if err != nil { - return nil, err - } - return getCrushLocationMap(crushLocationLabels, nodeLabels), nil } diff --git a/internal/util/csiconfig.go b/internal/util/csiconfig.go index 48a2e09c1fe4..44f198fcdab0 100644 --- a/internal/util/csiconfig.go +++ b/internal/util/csiconfig.go @@ -64,6 +64,11 @@ type ClusterInfo struct { // symlink filepath for the network namespace where we need to execute commands. NetNamespaceFilePath string `json:"netNamespaceFilePath"` } `json:"nfs"` + // Read affinity map options + ReadAffinity struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + } `json:"readAffinity"` } // Expected JSON structure in the passed in config file is, @@ -209,3 +214,24 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) return cluster.NFS.NetNamespaceFilePath, nil } + +// GetReadAffinityOptions returns the crushLocationLabels from csi config for the given clusterID +// If `readAffinity.enabled` is set to true. +func GetReadAffinityOptions(pathToConfig, clusterID string) (string, error) { + cluster, err := readClusterInfo(pathToConfig, clusterID) + if err != nil { + return "", err + } + + if !cluster.ReadAffinity.Enabled { + return "", nil + } + + if len(cluster.ReadAffinity.CrushLocationLabels) == 0 { + return "", fmt.Errorf("empty crush loction labels list for cluster ID (%s) in config", clusterID) + } + + crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",") + + return crushLocationLabels, nil +} diff --git a/internal/util/csiconfig_test.go b/internal/util/csiconfig_test.go index 66b5c927d982..b1e6d883048d 100644 --- a/internal/util/csiconfig_test.go +++ b/internal/util/csiconfig_test.go @@ -365,3 +365,101 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) { }) } } + +func TestGetReadAffinityOptions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + clusterID string + want string + }{ + { + name: "ReadAffinity enabled set to true for cluster-1", + clusterID: "cluster-1", + want: "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack", + }, + { + name: "ReadAffinity enabled set to true for cluster-2", + clusterID: "cluster-2", + want: "topology.kubernetes.io/region", + }, + { + name: "ReadAffinity enabled set to false for cluster-3", + clusterID: "cluster-3", + want: "", + }, + { + name: "ReadAffinity option not set in cluster-4", + clusterID: "cluster-4", + want: "", + }, + } + + csiConfig := []ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + "topology.kubernetes.io/zone", + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-4", + }, + } + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := t.TempDir() + "/ceph-csi.json" + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", CsiConfigFile, err) + } + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, err := GetReadAffinityOptions(tmpConfPath, tc.clusterID) + if err != nil { + t.Errorf("GetReadAffinityOptions() error = %v", err) + + return + } + if got != tc.want { + t.Errorf("GetReadAffinityOptions() = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/internal/util/k8s/node.go b/internal/util/k8s/node.go new file mode 100644 index 000000000000..b4ebac9c42f9 --- /dev/null +++ b/internal/util/k8s/node.go @@ -0,0 +1,23 @@ +package k8s + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func GetNodeLabels(nodeName string) (map[string]string, error) { + client, err := NewK8sClient() + if err != nil { + return nil, fmt.Errorf("can not get node %q information, failed "+ + "to connect to Kubernetes: %w", nodeName, err) + } + + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) + } + + return node.GetLabels(), nil +} diff --git a/internal/util/topology.go b/internal/util/topology.go index be99dbbe14a0..30e34999be34 100644 --- a/internal/util/topology.go +++ b/internal/util/topology.go @@ -17,16 +17,13 @@ limitations under the License. package util import ( - "context" "encoding/json" "fmt" "strings" "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" - "github.com/container-storage-interface/spec/lib/go/csi" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -34,21 +31,6 @@ const ( labelSeparator string = "," ) -func k8sGetNodeLabels(nodeName string) (map[string]string, error) { - client, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("can not get node %q information, failed "+ - "to connect to Kubernetes: %w", nodeName, err) - } - - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) - } - - return node.GetLabels(), nil -} - // GetTopologyFromDomainLabels returns the CSI topology map, determined from // the domain labels and their values from the CO system // Expects domainLabels in arg to be in the format "[prefix/],[prefix/],...",. @@ -82,7 +64,7 @@ func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map labelCount++ } - nodeLabels, err := k8sGetNodeLabels(nodeName) + nodeLabels, err := k8s.GetNodeLabels(nodeName) if err != nil { return nil, err }