Skip to content

Commit

Permalink
CSI: implement support for topology (#12129)
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Mar 1, 2022
1 parent 3fd9683 commit 03a8d72
Show file tree
Hide file tree
Showing 29 changed files with 598 additions and 118 deletions.
3 changes: 3 additions & 0 deletions .changelog/12129.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: Added support for storage topology
```
20 changes: 15 additions & 5 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,21 @@ func (w *WriteOptions) SetHeadersFromCSISecrets(secrets CSISecrets) {

// CSIVolume is used for serialization, see also nomad/structs/csi.go
type CSIVolume struct {
ID string
Name string
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string
Topologies []*CSITopology
ID string
Name string
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string

// RequestedTopologies are the topologies submitted as options to
// the storage provider at the time the volume was created. After
// volumes are created, this field is ignored.
RequestedTopologies *CSITopologyRequest `hcl:"topology_request"`

// Topologies are the topologies returned by the storage provider,
// based on the RequestedTopologies and what the storage provider
// could support. This value cannot be set by the user.
Topologies []*CSITopology

AccessMode CSIVolumeAccessMode `hcl:"access_mode"`
AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"`
MountOptions *CSIMountOptions `hcl:"mount_options"`
Expand Down
7 changes: 6 additions & 1 deletion api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,13 @@ type NodeReservedNetworkResources struct {
ReservedHostPorts string
}

type CSITopologyRequest struct {
Required []*CSITopology `hcl:"required"`
Preferred []*CSITopology `hcl:"preferred"`
}

type CSITopology struct {
Segments map[string]string
Segments map[string]string `hcl:"segments"`
}

// CSINodeInfo is the fingerprinted data from a CSI Plugin that is specific to
Expand Down
6 changes: 6 additions & 0 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext

resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology))
for _, topo := range cresp.Volume.AccessibleTopology {
resp.Topologies = append(resp.Topologies,
&nstructs.CSITopology{Segments: topo.Segments})
}

return nil
}

Expand Down
46 changes: 30 additions & 16 deletions client/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,16 @@ type ClientCSIControllerDetachVolumeResponse struct{}
// Nomad client to tell a CSI controller plugin on that client to perform
// CreateVolume
type ClientCSIControllerCreateVolumeRequest struct {
Name string
VolumeCapabilities []*structs.CSIVolumeCapability
MountOptions *structs.CSIMountOptions
Parameters map[string]string
Secrets structs.CSISecrets
CapacityMin int64
CapacityMax int64
SnapshotID string
CloneID string
// TODO: topology is not yet supported
// TopologyRequirement
Name string
VolumeCapabilities []*structs.CSIVolumeCapability
MountOptions *structs.CSIMountOptions
Parameters map[string]string
Secrets structs.CSISecrets
CapacityMin int64
CapacityMax int64
SnapshotID string
CloneID string
RequestedTopologies *structs.CSITopologyRequest

CSIControllerQuery
}
Expand All @@ -237,8 +236,10 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
CloneID: req.CloneID,
SnapshotID: req.SnapshotID,
},
// TODO: topology is not yet supported
AccessibilityRequirements: &csi.TopologyRequirement{},
AccessibilityRequirements: &csi.TopologyRequirement{
Requisite: []*csi.Topology{},
Preferred: []*csi.Topology{},
},
}

// The CSI spec requires that at least one of the fields in CapacityRange
Expand All @@ -258,16 +259,29 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
}
creq.VolumeCapabilities = append(creq.VolumeCapabilities, ccap)
}

if req.RequestedTopologies != nil {
for _, topo := range req.RequestedTopologies.Required {
creq.AccessibilityRequirements.Requisite = append(
creq.AccessibilityRequirements.Requisite, &csi.Topology{
Segments: topo.Segments,
})
}
for _, topo := range req.RequestedTopologies.Preferred {
creq.AccessibilityRequirements.Preferred = append(
creq.AccessibilityRequirements.Preferred, &csi.Topology{
Segments: topo.Segments,
})
}
}
return creq, nil
}

type ClientCSIControllerCreateVolumeResponse struct {
ExternalVolumeID string
CapacityBytes int64
VolumeContext map[string]string

// TODO: topology is not yet supported
// AccessibleTopology []*Topology
Topologies []*structs.CSITopology
}

// ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a
Expand Down
15 changes: 12 additions & 3 deletions command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,13 @@ func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume {
ModifyIndex: vol.ModifyIndex,
}

if vol.RequestedTopologies != nil {
out.RequestedTopologies = &api.CSITopologyRequest{
Preferred: structsCSITopolgiesToApi(vol.RequestedTopologies.Preferred),
Required: structsCSITopolgiesToApi(vol.RequestedTopologies.Required),
}
}

// WriteAllocs and ReadAllocs will only ever contain the Allocation ID,
// with a null value for the Allocation; these IDs are mapped to
// allocation stubs in the Allocations field. This indirection is so the
Expand Down Expand Up @@ -725,9 +732,11 @@ func structsTaskEventToApi(te *structs.TaskEvent) *api.TaskEvent {
func structsCSITopolgiesToApi(tops []*structs.CSITopology) []*api.CSITopology {
out := make([]*api.CSITopology, 0, len(tops))
for _, t := range tops {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
if t != nil {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
}
}

return out
Expand Down
27 changes: 27 additions & 0 deletions command/plugin_status_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, erro
full = append(full, c.Colorize().Color("\n[bold]Node Capabilities[reset]"))
full = append(full, nodeCaps)
}
topos := c.formatTopology(plug.Nodes)
if topos != "" {
full = append(full, c.Colorize().Color("\n[bold]Accessible Topologies[reset]"))
full = append(full, topos)
}

}

// Format the allocs
Expand Down Expand Up @@ -183,6 +189,9 @@ func (c *PluginStatusCommand) formatControllerCaps(controllers map[string]*api.C
func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) string {
caps := []string{}
for _, node := range nodes {
if node.RequiresTopologies {
caps = append(caps, "VOLUME_ACCESSIBILITY_CONSTRAINTS")
}
switch info := node.NodeInfo; {
case info.RequiresNodeStageVolume:
caps = append(caps, "STAGE_UNSTAGE_VOLUME")
Expand All @@ -207,3 +216,21 @@ func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) stri

return " " + strings.Join(sort.StringSlice(caps), "\n ")
}

func (c *PluginStatusCommand) formatTopology(nodes map[string]*api.CSIInfo) string {
rows := []string{"Node ID|Accessible Topology"}
for nodeID, node := range nodes {
if node.NodeInfo.AccessibleTopology != nil {
segments := node.NodeInfo.AccessibleTopology.Segments
segmentPairs := make([]string, 0, len(segments))
for k, v := range segments {
segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v))
}
rows = append(rows, fmt.Sprintf("%s|%s", nodeID[:8], strings.Join(segmentPairs, ",")))
}
}
if len(rows) == 1 {
return ""
}
return formatList(rows)
}
42 changes: 41 additions & 1 deletion command/volume_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ capacity_max = "20G"
# capabilities to validate. Registering an existing volume will record but
# ignore these fields.
capability {
access_mode = "single-node-writer"
access_mode = "single-node-writer"
attachment_mode = "file-system"
}
Expand All @@ -150,6 +150,18 @@ mount_options {
mount_flags = ["ro"]
}
# Optional: specify one or more locations where the volume must be accessible
# from. Refer to the plugin documentation for what segment values are supported.
topology_request {
preferred {
topology { segments { rack = "R1" } }
}
required {
topology { segments { rack = "R1" } }
topology { segments { rack = "R2", zone = "us-east-1a" } }
}
}
# Optional: provide any secrets specified by the plugin.
secrets {
example_secret = "xyzzy"
Expand Down Expand Up @@ -201,6 +213,34 @@ var defaultJsonVolumeSpec = strings.TrimSpace(`
]
}
],
"topology_request": {
"preferred": [
{
"topology": {
"segments": {
"rack": "R1"
}
}
}
],
"required": [
{
"topology": {
"segments": {
"rack": "R1"
}
}
},
{
"topology": {
"segments": {
"rack": "R2",
"zone": "us-east-1a"
}
}
}
]
},
"parameters": [
{
"skuname": "Premium_LRS"
Expand Down
43 changes: 43 additions & 0 deletions command/volume_register_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
delete(m, "mount_options")
delete(m, "capacity_max")
delete(m, "capacity_min")
delete(m, "topology_request")
delete(m, "type")

// Decode the rest
Expand Down Expand Up @@ -116,6 +117,48 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
}
}

requestedTopos := list.Filter("topology_request")
if len(requestedTopos.Items) > 0 {

vol.RequestedTopologies = &api.CSITopologyRequest{}

for _, o := range requestedTopos.Elem().Items {
if err := helper.CheckHCLKeys(o.Val, []string{"preferred", "required"}); err != nil {
return nil, err
}
ot, ok := o.Val.(*ast.ObjectType)
if !ok {
break
}

// topology_request -> required|preferred -> []topology -> []segments (kv)
decoded := map[string][]map[string][]map[string][]map[string]string{}
if err := hcl.DecodeObject(&decoded, ot.List); err != nil {
return nil, err
}

getTopologies := func(topKey string) []*api.CSITopology {
for _, topo := range decoded[topKey] {
var topos []*api.CSITopology
for _, segments := range topo["topology"] {
for _, segment := range segments["segments"] {
if len(segment) > 0 {
topos = append(topos, &api.CSITopology{Segments: segment})
}
}
}
if len(topos) > 0 {
return topos
}
}
return nil
}

vol.RequestedTopologies.Required = getTopologies("required")
vol.RequestedTopologies.Preferred = getTopologies("preferred")
}
}

return vol, nil
}

Expand Down
41 changes: 41 additions & 0 deletions command/volume_register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ capability {
access_mode = "single-node-reader-only"
attachment_mode = "block-device"
}
topology_request {
preferred {
topology { segments {rack = "R1"} }
}
required {
topology { segments {rack = "R1"} }
topology { segments {rack = "R2", zone = "us-east-1a"} }
}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
Expand All @@ -108,6 +119,16 @@ capability {
},
Parameters: map[string]string{"skuname": "Premium_LRS"},
Secrets: map[string]string{"password": "xyzzy"},
RequestedTopologies: &api.CSITopologyRequest{
Required: []*api.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
Preferred: []*api.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
},
Topologies: nil, // this is left empty
},
err: "",
}, {
Expand All @@ -124,6 +145,19 @@ capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}
topology_request {
# make sure we safely handle empty blocks even
# if they're invalid
preferred {
topology {}
topology { segments {} }
}
required {
topology { segments { rack = "R2", zone = "us-east-1a"} }
}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
Expand All @@ -136,6 +170,13 @@ capability {
AttachmentMode: api.CSIVolumeAttachmentModeFilesystem,
},
},
RequestedTopologies: &api.CSITopologyRequest{
Required: []*api.CSITopology{
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
Preferred: nil,
},
Topologies: nil,
},
err: "",
},
Expand Down
Loading

0 comments on commit 03a8d72

Please sign in to comment.