Skip to content

Commit

Permalink
util: add new cmdline args to enable read affinity
Browse files Browse the repository at this point in the history
This commit adds new cmdline args `enable-read-affinity`
and `crush-location-labels` to facilitate new feature of
using crush location labels from node labels to enable
read affinity using ceph's `read_from_replica=localize`
feature. This feature redirect reads to the nearest OSD.
This will be currently used only during rbd map cmd.

Signed-off-by: Rakshith R <rar@redhat.com>
  • Loading branch information
Rakshith-R committed Feb 2, 2023
1 parent c3d5b78 commit ad9a2aa
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 11 deletions.
7 changes: 7 additions & 0 deletions cmd/cephcsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ func init() {
"",
"list of kubernetes node labels, that determines the topology"+
" domain the node belongs to, separated by ','")
flag.BoolVar(&conf.EnableReadAffinity, "enable-read-affinity", false, "enable read affinity")
flag.StringVar(
&conf.CrushLocationLabels,
"crush-location-labels",
"",
"list of kubernetes node labels, that determines the"+
" crush location the node belongs to, separated by ','")

// cephfs related flags
flag.BoolVar(
Expand Down
2 changes: 1 addition & 1 deletion internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewNodeServer(
fuseMountOptions string,
) *NodeServer {
return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology, nil),
VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions,
Expand Down
5 changes: 4 additions & 1 deletion internal/csi-common/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type CSIDriver struct {
nodeID string
version string
// topology constraints that this nodeserver will advertise
topology map[string]string
topology map[string]string
// crushLocationMap that will be used to enable read affinity.
CrushLocationMap map[string]string

capabilities []*csi.ControllerServiceCapability
vc []*csi.VolumeCapability_AccessMode
}
Expand Down
7 changes: 6 additions & 1 deletion internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *c
}

// NewDefaultNodeServer initializes default node server.
func NewDefaultNodeServer(d *CSIDriver, t string, topology map[string]string) *DefaultNodeServer {
func NewDefaultNodeServer(
d *CSIDriver,
t string,
topology map[string]string,
crushLocationMap map[string]string) *DefaultNodeServer {
d.topology = topology
d.CrushLocationMap = crushLocationMap

return &DefaultNodeServer{
Driver: d,
Expand Down
2 changes: 1 addition & 1 deletion internal/nfs/nodeserver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewNodeServer(
t string,
) *NodeServer {
return &NodeServer{
DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, map[string]string{}),
DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, map[string]string{}, nil),
}
}

Expand Down
32 changes: 25 additions & 7 deletions internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer {
}

// NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*rbd.NodeServer, error) {
func NewNodeServer(
d *csicommon.CSIDriver,
t string,
topology map[string]string,
crushLocationMap map[string]string) (*rbd.NodeServer, error) {
return &rbd.NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology, crushLocationMap),
VolumeLocks: util.NewVolumeLocks(),
}, nil
}
Expand All @@ -84,9 +88,11 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string)
// This also configures and starts a new CSI-Addons service, by calling
// setupCSIAddonsServer().
func (r *Driver) Run(conf *util.Config) {
var err error
var topology map[string]string

var (
err error
topology map[string]string
crushLocationMap map[string]string
)
// update clone soft and hard limit
rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth)
rbd.SetGlobalInt("rbdSoftMaxCloneDepth", conf.RbdSoftMaxCloneDepth)
Expand Down Expand Up @@ -136,7 +142,13 @@ func (r *Driver) Run(conf *util.Config) {
if err != nil {
log.FatalLogMsg(err.Error())
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology)
if conf.EnableReadAffinity {
crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID)
if err != nil {
log.FatalLogMsg(err.Error())
}
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap)
if err != nil {
log.FatalLogMsg("failed to start node server, err %v\n", err)
}
Expand Down Expand Up @@ -168,7 +180,13 @@ func (r *Driver) Run(conf *util.Config) {
if err != nil {
log.FatalLogMsg(err.Error())
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology)
if conf.EnableReadAffinity {
crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID)
if err != nil {
log.FatalLogMsg(err.Error())
}
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap)
if err != nil {
log.FatalLogMsg("failed to start node server, err %v\n", err)
}
Expand Down
72 changes: 72 additions & 0 deletions internal/util/crushlocation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2023 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"strings"

"github.com/ceph/ceph-csi/internal/util/log"
)

// GetCrushLocationMap returns the crush location map, determined from
// the crush location labels and their values from the CO system.
// Expects crushLocationLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",.
// Returns map of crush location types with its array of associated values.
func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) {
if crushLocationLabels == "" {
return nil, nil
}

nodeLabels, err := k8sGetNodeLabels(nodeName)
if err != nil {
return nil, err
}

return getCrushLocationMap(crushLocationLabels, nodeLabels), nil
}

// getCrushLocationMap returns the crush location map, determined from
// the crush location labels and node labels.
func getCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string {
labelsToRead := strings.SplitN(crushLocationLabels, labelSeparator, -1)
log.DefaultLog("crush location labels passed for processing: %+v", labelsToRead)

labelsIn := make(map[string]bool)
for _, label := range labelsToRead {
labelsIn[label] = true
}

// Determine values for requested labels from node labels
crushLocationMap := make(map[string]string)
for key, value := range nodeLabels {
if _, ok := labelsIn[key]; ok {
// label found split name component and store value
nameIdx := strings.IndexRune(key, keySeparator)
crushLocationType := strings.TrimSpace(key[nameIdx+1:])
// replace "." with "-" to satisfy ceph crush map.
value = strings.Replace(strings.TrimSpace(value), ".", "-", -1)
crushLocationMap[crushLocationType] = value
}
}

if len(crushLocationMap) == 0 {
return nil
}
log.DefaultLog("list of crush location processed: %+v", crushLocationMap)

return crushLocationMap
}
98 changes: 98 additions & 0 deletions internal/util/crushlocation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright 2023 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_getCrushLocationMap(t *testing.T) {
type args struct {
crushLocationLabels string
nodeLabels map[string]string
}
tests := []struct {
name string
args args
want map[string]string
}{
{
name: "empty crushLocationLabels",
args: args{
crushLocationLabels: "",
nodeLabels: map[string]string{},
},
want: nil,
},
{
name: "empty nodeLabels",
args: args{
crushLocationLabels: "topology.io/zone,topology.io/rack",
nodeLabels: map[string]string{},
},
want: nil,
},
{
name: "matching crushlocation and node labels",
args: args{
crushLocationLabels: "topology.io/zone,topology.io/rack",
nodeLabels: map[string]string{
"topology.io/zone": "zone1",
},
},
want: map[string]string{"zone": "zone1"},
},
{
name: "multuple matching crushlocation and node labels",
args: args{
crushLocationLabels: "topology.io/zone,topology.io/rack",
nodeLabels: map[string]string{
"topology.io/zone": "zone1",
"topology.io/rack": "rack1",
},
},
want: map[string]string{"zone": "zone1", "rack": "rack1"},
},
{
name: "no match between crushlocation and node labels",
args: args{
crushLocationLabels: "topology.io/zone,topology.io/rack",
nodeLabels: map[string]string{
"topology.io/region": "zone1",
},
},
want: nil,
},
{
name: "check crushlocation value replacement to satify ceph requirement",
args: args{
crushLocationLabels: "topology.io/zone,topology.io/rack",
nodeLabels: map[string]string{
"topology.io/zone": "south.east.1",
},
},
want: map[string]string{"zone": "south-east-1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, getCrushLocationMap(tt.args.crushLocationLabels, tt.args.nodeLabels))
})
}
}
4 changes: 4 additions & 0 deletions internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ type Config struct {

// Cluster name
ClusterName string

// Read affinity related options
EnableReadAffinity bool // enable read affinity.
CrushLocationLabels string // list of crush location labels to read from the node.
}

// ValidateDriverName validates the driver name.
Expand Down

0 comments on commit ad9a2aa

Please sign in to comment.