diff --git a/charts/ceph-csi-cephfs/values.yaml b/charts/ceph-csi-cephfs/values.yaml index 05bf0beab5b0..b24c6b2e128a 100644 --- a/charts/ceph-csi-cephfs/values.yaml +++ b/charts/ceph-csi-cephfs/values.yaml @@ -28,6 +28,11 @@ serviceAccounts: # cephFS: # subvolumeGroup: "csi" # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Labels to apply to all resources diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index 0d43b1671109..64b74e2abd2d 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -27,6 +27,11 @@ serviceAccounts: # - "" # rbd: # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Configuration details of clusterID,PoolID and FscID mapping diff --git a/deploy/csi-config-map-sample.yaml b/deploy/csi-config-map-sample.yaml index b48e834a56f6..6397bbc84855 100644 --- a/deploy/csi-config-map-sample.yaml +++ b/deploy/csi-config-map-sample.yaml @@ -66,6 +66,15 @@ data: } "nfs": { "netNamespaceFilePath": "/plugins/nfs.csi.ceph.com/net", + }, + "readAffinity": { + "enabled": "false", + "crushLocationLabels": [ + "", + "" + ... + "" + ] } } ] diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 950a063afce4..6f0fcc0ad080 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -26,6 +26,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" @@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer { func NewNodeServer( d *csicommon.CSIDriver, t string, - topology map[string]string, - crushLocationMap map[string]string, + nodeLabels, topology, crushLocationMap map[string]string, ) (*rbd.NodeServer, error) { ns := rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), - VolumeLocks: util.NewVolumeLocks(), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + VolumeLocks: util.NewVolumeLocks(), + NodeLabels: nodeLabels, + CMDReadAffinityMapOptions: rbd.ConstructReadAffinityMapOption(crushLocationMap), } - ns.SetReadAffinityMapOptions(crushLocationMap) return &ns, nil } @@ -87,8 +88,8 @@ func NewNodeServer( // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { var ( - err error - topology, crushLocationMap map[string]string + err error + nodeLabels, topology, crushLocationMap map[string]string ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) @@ -125,8 +126,13 @@ func (r *Driver) Run(conf *util.Config) { }) } + nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + if conf.EnableReadAffinity { - crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels) if err != nil { log.FatalLogMsg(err.Error()) } @@ -140,7 +146,7 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) + r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 25f51b35c125..df6ab4b9f830 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,8 +45,10 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks - // readAffinityMapOptions contains map options to enable read affinity. - readAffinityMapOptions string + // NodeLabels stores the node labels + NodeLabels map[string]string + // cmdreadAffinityMapOptions contains map options passed through command line to enable read affinity. + CMDReadAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed @@ -262,7 +264,9 @@ func (ns *NodeServer) populateRbdVol( if err != nil { return nil, err } - ns.appendReadAffinityMapOptions(rv) + + readAffinityMapOptions := ns.getReadAffinityOptions(rv.ClusterID) + rv.appendReadAffinityMapOptions(readAffinityMapOptions) rv.VolID = volID @@ -280,14 +284,14 @@ func (ns *NodeServer) populateRbdVol( // appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions // if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty. -func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) { +func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) { switch { - case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: + case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: return case rv.MapOptions != "": - rv.MapOptions += "," + ns.readAffinityMapOptions + rv.MapOptions += "," + readAffinityMapOptions default: - rv.MapOptions = ns.readAffinityMapOptions + rv.MapOptions = readAffinityMapOptions } } @@ -1396,9 +1400,42 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) { return size, nil } -func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) { +// getReadAffinityMapOptionFromConfigMap retrieves read affinity map options for cluster `clusterID`. +func (ns *NodeServer) getReadAffinityMapOptionFromConfigMap(clusterID string) string { + crushLocationLabels, err := util.GetReadAffinityOptions(util.CsiConfigFile, clusterID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + + crushLocationMap, err := util.GetCrushLocationMap(crushLocationLabels, ns.NodeLabels) + if err != nil { + log.FatalLogMsg(err.Error()) + } + + readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap) + + return readAffinityMapOptions +} + +// getReadAffinityOptions retrieves the `readAffinityMapOptions` from the CSI config file if it exists. +// If not, it falls back to returning the `readAffinityMapOptions` set in the `cmdReadAffinityMapOptions` +// from the command line. If neither of these options is available, it returns an empty string. +func (ns *NodeServer) getReadAffinityOptions(clusterID string) string { + var readAffinityMapOptions string + readAffinityMapOptions = ns.getReadAffinityMapOptionFromConfigMap(clusterID) + if readAffinityMapOptions == "" { + readAffinityMapOptions = ns.CMDReadAffinityMapOptions + } + + return readAffinityMapOptions +} + +// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap. +// It appends crush location labels in the format +// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...". +func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string { if len(crushLocationMap) == 0 { - return + return "" } var b strings.Builder @@ -1412,5 +1449,6 @@ func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]stri b.WriteString(fmt.Sprintf("|%s:%s", key, val)) } } - ns.readAffinityMapOptions = b.String() + + return b.String() } diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 822232c6fac9..fe5dc1646b35 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -107,7 +107,7 @@ func TestParseBoolOption(t *testing.T) { } } -func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { +func TestNodeServer_SetcmdReadAffinityMapOptions(t *testing.T) { t.Parallel() tests := []struct { name string @@ -147,9 +147,10 @@ func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { currentTT := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - ns := &NodeServer{} - ns.SetReadAffinityMapOptions(currentTT.crushLocationmap) - assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions) + ns := &NodeServer{ + CMDReadAffinityMapOptions: ConstructReadAffinityMapOption(currentTT.crushLocationmap), + } + assert.Contains(t, currentTT.wantAny, ns.CMDReadAffinityMapOptions) }) } } @@ -236,10 +237,7 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { MapOptions: currentTT.args.mapOptions, Mounter: currentTT.args.mounter, } - ns := &NodeServer{ - readAffinityMapOptions: currentTT.args.readAffinityMapOptions, - } - ns.appendReadAffinityMapOptions(rv) + rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions) assert.Equal(t, currentTT.want, rv.MapOptions) }) } diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go index 5f3751c339d5..4b7a70496b0d 100644 --- a/internal/util/crushlocation.go +++ b/internal/util/crushlocation.go @@ -23,19 +23,14 @@ import ( ) // GetCrushLocationMap returns the crush location map, determined from -// the crush location labels and their values from the CO system. +// the crush location labels and their values from the node labels passed in arg. // Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. // Returns map of crush location types with its array of associated values. -func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { +func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) (map[string]string, error) { if crushLocationLabels == "" { return nil, nil } - nodeLabels, err := k8sGetNodeLabels(nodeName) - if err != nil { - return nil, err - } - return getCrushLocationMap(crushLocationLabels, nodeLabels), nil } diff --git a/internal/util/csiconfig.go b/internal/util/csiconfig.go index 48a2e09c1fe4..44f198fcdab0 100644 --- a/internal/util/csiconfig.go +++ b/internal/util/csiconfig.go @@ -64,6 +64,11 @@ type ClusterInfo struct { // symlink filepath for the network namespace where we need to execute commands. NetNamespaceFilePath string `json:"netNamespaceFilePath"` } `json:"nfs"` + // Read affinity map options + ReadAffinity struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + } `json:"readAffinity"` } // Expected JSON structure in the passed in config file is, @@ -209,3 +214,24 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) return cluster.NFS.NetNamespaceFilePath, nil } + +// GetReadAffinityOptions returns the crushLocationLabels from csi config for the given clusterID +// If `readAffinity.enabled` is set to true. +func GetReadAffinityOptions(pathToConfig, clusterID string) (string, error) { + cluster, err := readClusterInfo(pathToConfig, clusterID) + if err != nil { + return "", err + } + + if !cluster.ReadAffinity.Enabled { + return "", nil + } + + if len(cluster.ReadAffinity.CrushLocationLabels) == 0 { + return "", fmt.Errorf("empty crush loction labels list for cluster ID (%s) in config", clusterID) + } + + crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",") + + return crushLocationLabels, nil +} diff --git a/internal/util/csiconfig_test.go b/internal/util/csiconfig_test.go index 66b5c927d982..b1e6d883048d 100644 --- a/internal/util/csiconfig_test.go +++ b/internal/util/csiconfig_test.go @@ -365,3 +365,101 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) { }) } } + +func TestGetReadAffinityOptions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + clusterID string + want string + }{ + { + name: "ReadAffinity enabled set to true for cluster-1", + clusterID: "cluster-1", + want: "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack", + }, + { + name: "ReadAffinity enabled set to true for cluster-2", + clusterID: "cluster-2", + want: "topology.kubernetes.io/region", + }, + { + name: "ReadAffinity enabled set to false for cluster-3", + clusterID: "cluster-3", + want: "", + }, + { + name: "ReadAffinity option not set in cluster-4", + clusterID: "cluster-4", + want: "", + }, + } + + csiConfig := []ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + "topology.kubernetes.io/zone", + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-4", + }, + } + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := t.TempDir() + "/ceph-csi.json" + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", CsiConfigFile, err) + } + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, err := GetReadAffinityOptions(tmpConfPath, tc.clusterID) + if err != nil { + t.Errorf("GetReadAffinityOptions() error = %v", err) + + return + } + if got != tc.want { + t.Errorf("GetReadAffinityOptions() = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/internal/util/k8s/node.go b/internal/util/k8s/node.go new file mode 100644 index 000000000000..b4ebac9c42f9 --- /dev/null +++ b/internal/util/k8s/node.go @@ -0,0 +1,23 @@ +package k8s + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func GetNodeLabels(nodeName string) (map[string]string, error) { + client, err := NewK8sClient() + if err != nil { + return nil, fmt.Errorf("can not get node %q information, failed "+ + "to connect to Kubernetes: %w", nodeName, err) + } + + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) + } + + return node.GetLabels(), nil +} diff --git a/internal/util/topology.go b/internal/util/topology.go index be99dbbe14a0..30e34999be34 100644 --- a/internal/util/topology.go +++ b/internal/util/topology.go @@ -17,16 +17,13 @@ limitations under the License. package util import ( - "context" "encoding/json" "fmt" "strings" "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" - "github.com/container-storage-interface/spec/lib/go/csi" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -34,21 +31,6 @@ const ( labelSeparator string = "," ) -func k8sGetNodeLabels(nodeName string) (map[string]string, error) { - client, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("can not get node %q information, failed "+ - "to connect to Kubernetes: %w", nodeName, err) - } - - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) - } - - return node.GetLabels(), nil -} - // GetTopologyFromDomainLabels returns the CSI topology map, determined from // the domain labels and their values from the CO system // Expects domainLabels in arg to be in the format "[prefix/],[prefix/],...",. @@ -82,7 +64,7 @@ func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map labelCount++ } - nodeLabels, err := k8sGetNodeLabels(nodeName) + nodeLabels, err := k8s.GetNodeLabels(nodeName) if err != nil { return nil, err }