From c9b845d5d15e23386ad779bcf0c378887bcd6177 Mon Sep 17 00:00:00 2001 From: Praveen M Date: Fri, 6 Oct 2023 20:11:49 +0530 Subject: [PATCH] deploy: support for read affinity options per cluster Implemented the capability to include read affinity options for individual clusters within the ceph-csi-config ConfigMap. This allows users to configure the crush location for each cluster separately. The read affinity options specified in the ConfigMap will supersede those provided via command line arguments. Signed-off-by: Praveen M --- .../templates/nodeplugin-daemonset.yaml | 2 +- charts/ceph-csi-rbd/values.yaml | 5 + cmd/cephcsi.go | 1 + deploy/csi-config-map-sample.yaml | 9 ++ docs/deploy-rbd.md | 11 ++- internal/rbd/driver/driver.go | 26 +++-- internal/rbd/nodeserver.go | 36 ++----- internal/rbd/nodeserver_test.go | 16 +-- internal/rbd/rbd_attach.go | 8 +- internal/util/crushlocation.go | 9 +- internal/util/csiconfig.go | 22 +++++ internal/util/csiconfig_test.go | 98 +++++++++++++++++++ internal/util/k8s/client.go | 10 ++ internal/util/k8s/node.go | 39 ++++++++ internal/util/read_affinity.go | 82 ++++++++++++++++ internal/util/topology.go | 23 +---- internal/util/util.go | 3 + 17 files changed, 324 insertions(+), 76 deletions(-) create mode 100644 internal/util/k8s/node.go create mode 100644 internal/util/read_affinity.go diff --git a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml index 18a5929beeed..426418af136c 100644 --- a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml @@ -87,7 +87,7 @@ spec: {{- if .Values.nodeplugin.profiling.enabled }} - "--enableprofiling={{ .Values.nodeplugin.profiling.enabled }}" {{- end }} - - "--enable-read-affinity={{ and .Values.readAffinity .Values.readAffinity.enabled }}" +- "--enable-read-affinity={{ and .Values.readAffinity .Values.readAffinity.enabled }}" {{- if and .Values.readAffinity .Values.readAffinity.enabled }} - "--crush-location-labels={{ .Values.readAffinity.crushLocationLabels | join "," }}" {{- end }} diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index 0d43b1671109..64b74e2abd2d 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -27,6 +27,11 @@ serviceAccounts: # - "" # rbd: # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Configuration details of clusterID,PoolID and FscID mapping diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index dac23d21e44a..e89a5986fa32 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -88,6 +88,7 @@ func init() { "", "list of Kubernetes node labels, that determines the"+ " CRUSH location the node belongs to, separated by ','") + flag.StringVar(&conf.ContainerOrchestrator, "container-orchestrator", "kubernetes", "container orchestrator [kubernetes]") // cephfs related flags flag.BoolVar( diff --git a/deploy/csi-config-map-sample.yaml b/deploy/csi-config-map-sample.yaml index b48e834a56f6..6397bbc84855 100644 --- a/deploy/csi-config-map-sample.yaml +++ b/deploy/csi-config-map-sample.yaml @@ -66,6 +66,15 @@ data: } "nfs": { "netNamespaceFilePath": "/plugins/nfs.csi.ceph.com/net", + }, + "readAffinity": { + "enabled": "false", + "crushLocationLabels": [ + "", + "" + ... + "" + ] } } ] diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index d542fe3035df..2ff23f97ac37 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -48,8 +48,7 @@ make image-cephcsi | `--skipforceflatten` | `false` | skip image flattening on kernel < 5.2 which support mapping of rbd images which has the deep-flatten feature | | `--maxsnapshotsonimage` | `450` | Maximum number of snapshots allowed on rbd image without flattening | | `--setmetadata` | `false` | Set metadata on volume | -| `--enable-read-affinity` | `false` | enable read affinity | -| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ',' | +| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.
`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` | **Available volume parameters:** @@ -215,7 +214,7 @@ for more details. This can be enabled by adding labels to Kubernetes nodes like `"topology.io/region=east"` and `"topology.io/zone=east-zone1"` and -passing command line arguments `"--enable-read-affinity=true"` and +passing command line argument `"--crush-location-labels=topology.io/zone,topology.io/region"` to Ceph CSI RBD daemonset pod "csi-rbdplugin" container, resulting in Ceph CSI adding `"--options read_from_replica=localize,crush_location=zone:east-zone1|region:east"` @@ -224,6 +223,12 @@ If enabled, this option will be added to all RBD volumes mapped by Ceph CSI. Well known labels can be found [here](https://kubernetes.io/docs/reference/labels-annotations-taints/). +Read affinity can be configured for individual clusters within the +`ceph-csi-config` ConfigMap. This allows configuring the crush location labels +for each ceph cluster separately. The crush location labels specified in the +ConfigMap will supersede those provided via command line argument +`--crush-location-labels`. + >Note: Label values will have all its dots `"."` normalized with dashes `"-"` in order for it to work with ceph CRUSH map. diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 950a063afce4..112dcdbc2794 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -26,6 +26,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" @@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer { func NewNodeServer( d *csicommon.CSIDriver, t string, - topology map[string]string, - crushLocationMap map[string]string, + nodeLabels, topology, crushLocationMap map[string]string, ) (*rbd.NodeServer, error) { ns := rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), - VolumeLocks: util.NewVolumeLocks(), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + VolumeLocks: util.NewVolumeLocks(), + NodeLabels: nodeLabels, + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), } - ns.SetReadAffinityMapOptions(crushLocationMap) return &ns, nil } @@ -87,8 +88,8 @@ func NewNodeServer( // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { var ( - err error - topology, crushLocationMap map[string]string + err error + nodeLabels, topology, crushLocationMap map[string]string ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) @@ -125,8 +126,15 @@ func (r *Driver) Run(conf *util.Config) { }) } + if conf.ContainerOrchestrator == "kubernetes" { + nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + } + if conf.EnableReadAffinity { - crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels) if err != nil { log.FatalLogMsg(err.Error()) } @@ -140,7 +148,7 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) + r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 25f51b35c125..96e5ed84aa4d 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,8 +45,10 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks - // readAffinityMapOptions contains map options to enable read affinity. - readAffinityMapOptions string + // NodeLabels stores the node labels + NodeLabels map[string]string + // CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity. + CLIReadAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed @@ -258,11 +260,10 @@ func (ns *NodeServer) populateRbdVol( rv.Mounter = rbdNbdMounter } - err = getMapOptions(req, rv) + err = ns.getMapOptions(req, rv) if err != nil { return nil, err } - ns.appendReadAffinityMapOptions(rv) rv.VolID = volID @@ -280,14 +281,14 @@ func (ns *NodeServer) populateRbdVol( // appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions // if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty. -func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) { +func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) { switch { - case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: + case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: return case rv.MapOptions != "": - rv.MapOptions += "," + ns.readAffinityMapOptions + rv.MapOptions += "," + readAffinityMapOptions default: - rv.MapOptions = ns.readAffinityMapOptions + rv.MapOptions = readAffinityMapOptions } } @@ -1395,22 +1396,3 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) { return size, nil } - -func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) { - if len(crushLocationMap) == 0 { - return - } - - var b strings.Builder - b.WriteString("read_from_replica=localize,crush_location=") - first := true - for key, val := range crushLocationMap { - if first { - b.WriteString(fmt.Sprintf("%s:%s", key, val)) - first = false - } else { - b.WriteString(fmt.Sprintf("|%s:%s", key, val)) - } - } - ns.readAffinityMapOptions = b.String() -} diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 822232c6fac9..36ce8c292048 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -20,6 +20,8 @@ import ( "context" "testing" + "github.com/ceph/ceph-csi/internal/util" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/stretchr/testify/assert" ) @@ -107,7 +109,7 @@ func TestParseBoolOption(t *testing.T) { } } -func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { +func TestNodeServer_ConstructReadAffinityMapOption(t *testing.T) { t.Parallel() tests := []struct { name string @@ -147,9 +149,10 @@ func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { currentTT := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - ns := &NodeServer{} - ns.SetReadAffinityMapOptions(currentTT.crushLocationmap) - assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions) + ns := &NodeServer{ + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(currentTT.crushLocationmap), + } + assert.Contains(t, currentTT.wantAny, ns.CLIReadAffinityMapOptions) }) } } @@ -236,10 +239,7 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { MapOptions: currentTT.args.mapOptions, Mounter: currentTT.args.mounter, } - ns := &NodeServer{ - readAffinityMapOptions: currentTT.args.readAffinityMapOptions, - } - ns.appendReadAffinityMapOptions(rv) + rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions) assert.Equal(t, currentTT.want, rv.MapOptions) }) } diff --git a/internal/rbd/rbd_attach.go b/internal/rbd/rbd_attach.go index c8d4701e29f6..5baff564804a 100644 --- a/internal/rbd/rbd_attach.go +++ b/internal/rbd/rbd_attach.go @@ -295,7 +295,7 @@ func parseMapOptions(mapOptions string) (string, string, error) { // getMapOptions is a wrapper func, calls parse map/unmap funcs and feeds the // rbdVolume object. -func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { +func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { krbdMapOptions, nbdMapOptions, err := parseMapOptions(req.GetVolumeContext()["mapOptions"]) if err != nil { return err @@ -312,6 +312,12 @@ func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { rv.UnmapOptions = nbdUnmapOptions } + readAffinityMapOptions, err := util.GetReadAffinityMapOptions(rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels) + if err != nil { + return err + } + rv.appendReadAffinityMapOptions(readAffinityMapOptions) + return nil } diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go index 5f3751c339d5..4b7a70496b0d 100644 --- a/internal/util/crushlocation.go +++ b/internal/util/crushlocation.go @@ -23,19 +23,14 @@ import ( ) // GetCrushLocationMap returns the crush location map, determined from -// the crush location labels and their values from the CO system. +// the crush location labels and their values from the node labels passed in arg. // Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. // Returns map of crush location types with its array of associated values. -func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { +func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) (map[string]string, error) { if crushLocationLabels == "" { return nil, nil } - nodeLabels, err := k8sGetNodeLabels(nodeName) - if err != nil { - return nil, err - } - return getCrushLocationMap(crushLocationLabels, nodeLabels), nil } diff --git a/internal/util/csiconfig.go b/internal/util/csiconfig.go index 48a2e09c1fe4..aad750db3fd3 100644 --- a/internal/util/csiconfig.go +++ b/internal/util/csiconfig.go @@ -64,6 +64,11 @@ type ClusterInfo struct { // symlink filepath for the network namespace where we need to execute commands. NetNamespaceFilePath string `json:"netNamespaceFilePath"` } `json:"nfs"` + // Read affinity map options + ReadAffinity struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + } `json:"readAffinity"` } // Expected JSON structure in the passed in config file is, @@ -209,3 +214,20 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) return cluster.NFS.NetNamespaceFilePath, nil } + +// GetCrushLocationLabels returns the crushLocationLabels from csi config for the given clusterID +// If `readAffinity.enabled` is set to true, else returns an empty string. +func GetCrushLocationLabels(pathToConfig, clusterID string) (bool, string, error) { + cluster, err := readClusterInfo(pathToConfig, clusterID) + if err != nil { + return false, "", err + } + + if !cluster.ReadAffinity.Enabled { + return false, "", nil + } + + crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",") + + return true, crushLocationLabels, nil +} diff --git a/internal/util/csiconfig_test.go b/internal/util/csiconfig_test.go index 66b5c927d982..fb8cd8557c7d 100644 --- a/internal/util/csiconfig_test.go +++ b/internal/util/csiconfig_test.go @@ -365,3 +365,101 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) { }) } } + +func TestGetReadAffinityOptions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + clusterID string + want string + }{ + { + name: "ReadAffinity enabled set to true for cluster-1", + clusterID: "cluster-1", + want: "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack", + }, + { + name: "ReadAffinity enabled set to true for cluster-2", + clusterID: "cluster-2", + want: "topology.kubernetes.io/region", + }, + { + name: "ReadAffinity enabled set to false for cluster-3", + clusterID: "cluster-3", + want: "", + }, + { + name: "ReadAffinity option not set in cluster-4", + clusterID: "cluster-4", + want: "", + }, + } + + csiConfig := []ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + "topology.kubernetes.io/zone", + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-4", + }, + } + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := t.TempDir() + "/ceph-csi.json" + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", CsiConfigFile, err) + } + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + _, got, err := GetCrushLocationLabels(tmpConfPath, tc.clusterID) + if err != nil { + t.Errorf("GetCrushLocationLabels() error = %v", err) + + return + } + if got != tc.want { + t.Errorf("GetCrushLocationLabels() = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/internal/util/k8s/client.go b/internal/util/k8s/client.go index 684fd7090308..0665a5e43b8a 100644 --- a/internal/util/k8s/client.go +++ b/internal/util/k8s/client.go @@ -25,8 +25,14 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +var kubeclient *kubernetes.Clientset + // NewK8sClient create kubernetes client. func NewK8sClient() (*kubernetes.Clientset, error) { + if kubeclient != nil { + return kubeclient, nil + } + var cfg *rest.Config var err error cPath := os.Getenv("KUBERNETES_CONFIG_PATH") @@ -46,5 +52,9 @@ func NewK8sClient() (*kubernetes.Clientset, error) { return nil, fmt.Errorf("failed to create client: %w", err) } + if kubeclient == nil { + kubeclient = client + } + return client, nil } diff --git a/internal/util/k8s/node.go b/internal/util/k8s/node.go new file mode 100644 index 000000000000..ad7760cd8803 --- /dev/null +++ b/internal/util/k8s/node.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 The CephCSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func GetNodeLabels(nodeName string) (map[string]string, error) { + client, err := NewK8sClient() + if err != nil { + return nil, fmt.Errorf("can not get node %q information, failed "+ + "to connect to Kubernetes: %w", nodeName, err) + } + + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) + } + + return node.GetLabels(), nil +} diff --git a/internal/util/read_affinity.go b/internal/util/read_affinity.go new file mode 100644 index 000000000000..de2a23987d5a --- /dev/null +++ b/internal/util/read_affinity.go @@ -0,0 +1,82 @@ +// /* +// Copyright 2023 The Ceph-CSI Authors. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package util + +import ( + "fmt" + "strings" + + "github.com/ceph/ceph-csi/internal/util/log" +) + +// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap. +// It appends crush location labels in the format +// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...". +func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string { + if len(crushLocationMap) == 0 { + return "" + } + + var b strings.Builder + b.WriteString("read_from_replica=localize,crush_location=") + first := true + for key, val := range crushLocationMap { + if first { + b.WriteString(fmt.Sprintf("%s:%s", key, val)) + first = false + } else { + b.WriteString(fmt.Sprintf("|%s:%s", key, val)) + } + } + + return b.String() +} + +// GetReadAffinityMapOptions retrieves the readAffinityMapOptions from the CSI config file if it exists. +// If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line. +// If neither of these options is available, it returns an empty string. +func GetReadAffinityMapOptions( + clusterID, cliReadAffinityMapOptions string, nodeLabels map[string]string, +) (string, error) { + var ( + err error + configReadAffinityEnabled bool + configCrushLocationLabels string + ) + + configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(CsiConfigFile, clusterID) + if err != nil { + return "", err + } + + if !configReadAffinityEnabled { + return "", nil + } + + if configCrushLocationLabels == "" { + return cliReadAffinityMapOptions, nil + } + + crushLocationMap, err := GetCrushLocationMap(configCrushLocationLabels, nodeLabels) + if err != nil { + log.FatalLogMsg(err.Error()) + } + + readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap) + + return readAffinityMapOptions, nil +} diff --git a/internal/util/topology.go b/internal/util/topology.go index be99dbbe14a0..1f08ca6ff049 100644 --- a/internal/util/topology.go +++ b/internal/util/topology.go @@ -17,16 +17,14 @@ limitations under the License. package util import ( - "context" "encoding/json" "fmt" "strings" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" - - "github.com/container-storage-interface/spec/lib/go/csi" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -34,21 +32,6 @@ const ( labelSeparator string = "," ) -func k8sGetNodeLabels(nodeName string) (map[string]string, error) { - client, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("can not get node %q information, failed "+ - "to connect to Kubernetes: %w", nodeName, err) - } - - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) - } - - return node.GetLabels(), nil -} - // GetTopologyFromDomainLabels returns the CSI topology map, determined from // the domain labels and their values from the CO system // Expects domainLabels in arg to be in the format "[prefix/],[prefix/],...",. @@ -82,7 +65,7 @@ func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map labelCount++ } - nodeLabels, err := k8sGetNodeLabels(nodeName) + nodeLabels, err := k8s.GetNodeLabels(nodeName) if err != nil { return nil, err } diff --git a/internal/util/util.go b/internal/util/util.go index 659b4032080e..c5832ab09eb1 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -151,6 +151,9 @@ type Config struct { // Read affinity related options EnableReadAffinity bool // enable OSD read affinity. CrushLocationLabels string // list of CRUSH location labels to read from the node. + + // ContainerOrchestrator represents the choice of container orchestration system to be used. + ContainerOrchestrator string } // ValidateDriverName validates the driver name.