From ad9a2aad9a70b806788ceac0cc75dd5e28ed6911 Mon Sep 17 00:00:00 2001 From: Rakshith R Date: Thu, 2 Feb 2023 15:16:37 +0530 Subject: [PATCH] util: add new cmdline args to enable read affinity This commit adds new cmdline args `enable-read-affinity` and `crush-location-labels` to facilitate new feature of using crush location labels from node labels to enable read affinity using ceph's `read_from_replica=localize` feature. This feature redirect reads to the nearest OSD. This will be currently used only during rbd map cmd. Signed-off-by: Rakshith R --- cmd/cephcsi.go | 7 ++ internal/cephfs/driver.go | 2 +- internal/csi-common/driver.go | 5 +- internal/csi-common/utils.go | 7 +- internal/nfs/nodeserver/nodeserver.go | 2 +- internal/rbd/driver/driver.go | 32 +++++++-- internal/util/crushlocation.go | 72 ++++++++++++++++++++ internal/util/crushlocation_test.go | 98 +++++++++++++++++++++++++++ internal/util/util.go | 4 ++ 9 files changed, 218 insertions(+), 11 deletions(-) create mode 100644 internal/util/crushlocation.go create mode 100644 internal/util/crushlocation_test.go diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index c0a657a0c87c..a06abe8472ae 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -81,6 +81,13 @@ func init() { "", "list of kubernetes node labels, that determines the topology"+ " domain the node belongs to, separated by ','") + flag.BoolVar(&conf.EnableReadAffinity, "enable-read-affinity", false, "enable read affinity") + flag.StringVar( + &conf.CrushLocationLabels, + "crush-location-labels", + "", + "list of kubernetes node labels, that determines the"+ + " crush location the node belongs to, separated by ','") // cephfs related flags flag.BoolVar( diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 5261d12aa22b..96093af148b0 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -72,7 +72,7 @@ func NewNodeServer( fuseMountOptions string, ) *NodeServer { return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology, nil), VolumeLocks: util.NewVolumeLocks(), kernelMountOptions: kernelMountOptions, fuseMountOptions: fuseMountOptions, diff --git a/internal/csi-common/driver.go b/internal/csi-common/driver.go index ee2b42434ec8..d9f5e5d522be 100644 --- a/internal/csi-common/driver.go +++ b/internal/csi-common/driver.go @@ -33,7 +33,10 @@ type CSIDriver struct { nodeID string version string // topology constraints that this nodeserver will advertise - topology map[string]string + topology map[string]string + // crushLocationMap that will be used to enable read affinity. + CrushLocationMap map[string]string + capabilities []*csi.ControllerServiceCapability vc []*csi.VolumeCapability_AccessMode } diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index a2eaa741f8cc..e4c222cb5157 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -58,8 +58,13 @@ func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *c } // NewDefaultNodeServer initializes default node server. -func NewDefaultNodeServer(d *CSIDriver, t string, topology map[string]string) *DefaultNodeServer { +func NewDefaultNodeServer( + d *CSIDriver, + t string, + topology map[string]string, + crushLocationMap map[string]string) *DefaultNodeServer { d.topology = topology + d.CrushLocationMap = crushLocationMap return &DefaultNodeServer{ Driver: d, diff --git a/internal/nfs/nodeserver/nodeserver.go b/internal/nfs/nodeserver/nodeserver.go index c4e7ca84565e..d116515228a2 100644 --- a/internal/nfs/nodeserver/nodeserver.go +++ b/internal/nfs/nodeserver/nodeserver.go @@ -54,7 +54,7 @@ func NewNodeServer( t string, ) *NodeServer { return &NodeServer{ - DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, map[string]string{}), + DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, map[string]string{}, nil), } } diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 76c2dcc84d73..2652775bb7cb 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -71,9 +71,13 @@ func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer { } // NewNodeServer initialize a node server for rbd CSI driver. -func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*rbd.NodeServer, error) { +func NewNodeServer( + d *csicommon.CSIDriver, + t string, + topology map[string]string, + crushLocationMap map[string]string) (*rbd.NodeServer, error) { return &rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology, crushLocationMap), VolumeLocks: util.NewVolumeLocks(), }, nil } @@ -84,9 +88,11 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) // This also configures and starts a new CSI-Addons service, by calling // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { - var err error - var topology map[string]string - + var ( + err error + topology map[string]string + crushLocationMap map[string]string + ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) rbd.SetGlobalInt("rbdSoftMaxCloneDepth", conf.RbdSoftMaxCloneDepth) @@ -136,7 +142,13 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology) + if conf.EnableReadAffinity { + crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + } + r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } @@ -168,7 +180,13 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology) + if conf.EnableReadAffinity { + crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + } + r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go new file mode 100644 index 000000000000..14112933a2e5 --- /dev/null +++ b/internal/util/crushlocation.go @@ -0,0 +1,72 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "strings" + + "github.com/ceph/ceph-csi/internal/util/log" +) + +// GetCrushLocationMap returns the crush location map, determined from +// the crush location labels and their values from the CO system. +// Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. +// Returns map of crush location types with its array of associated values. +func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { + if crushLocationLabels == "" { + return nil, nil + } + + nodeLabels, err := k8sGetNodeLabels(nodeName) + if err != nil { + return nil, err + } + + return getCrushLocationMap(crushLocationLabels, nodeLabels), nil +} + +// getCrushLocationMap returns the crush location map, determined from +// the crush location labels and node labels. +func getCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string { + labelsToRead := strings.SplitN(crushLocationLabels, labelSeparator, -1) + log.DefaultLog("crush location labels passed for processing: %+v", labelsToRead) + + labelsIn := make(map[string]bool) + for _, label := range labelsToRead { + labelsIn[label] = true + } + + // Determine values for requested labels from node labels + crushLocationMap := make(map[string]string) + for key, value := range nodeLabels { + if _, ok := labelsIn[key]; ok { + // label found split name component and store value + nameIdx := strings.IndexRune(key, keySeparator) + crushLocationType := strings.TrimSpace(key[nameIdx+1:]) + // replace "." with "-" to satisfy ceph crush map. + value = strings.Replace(strings.TrimSpace(value), ".", "-", -1) + crushLocationMap[crushLocationType] = value + } + } + + if len(crushLocationMap) == 0 { + return nil + } + log.DefaultLog("list of crush location processed: %+v", crushLocationMap) + + return crushLocationMap +} diff --git a/internal/util/crushlocation_test.go b/internal/util/crushlocation_test.go new file mode 100644 index 000000000000..391d148e0cb1 --- /dev/null +++ b/internal/util/crushlocation_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_getCrushLocationMap(t *testing.T) { + type args struct { + crushLocationLabels string + nodeLabels map[string]string + } + tests := []struct { + name string + args args + want map[string]string + }{ + { + name: "empty crushLocationLabels", + args: args{ + crushLocationLabels: "", + nodeLabels: map[string]string{}, + }, + want: nil, + }, + { + name: "empty nodeLabels", + args: args{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{}, + }, + want: nil, + }, + { + name: "matching crushlocation and node labels", + args: args{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/zone": "zone1", + }, + }, + want: map[string]string{"zone": "zone1"}, + }, + { + name: "multuple matching crushlocation and node labels", + args: args{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/zone": "zone1", + "topology.io/rack": "rack1", + }, + }, + want: map[string]string{"zone": "zone1", "rack": "rack1"}, + }, + { + name: "no match between crushlocation and node labels", + args: args{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/region": "zone1", + }, + }, + want: nil, + }, + { + name: "check crushlocation value replacement to satify ceph requirement", + args: args{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/zone": "south.east.1", + }, + }, + want: map[string]string{"zone": "south-east-1"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, getCrushLocationMap(tt.args.crushLocationLabels, tt.args.nodeLabels)) + }) + } +} diff --git a/internal/util/util.go b/internal/util/util.go index 528b3f1e14e0..5bb3e0c870c1 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -147,6 +147,10 @@ type Config struct { // Cluster name ClusterName string + + // Read affinity related options + EnableReadAffinity bool // enable read affinity. + CrushLocationLabels string // list of crush location labels to read from the node. } // ValidateDriverName validates the driver name.