Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: implement support for topology #12129

Merged
merged 9 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12129.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: Added support for storage topology
```
20 changes: 15 additions & 5 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,21 @@ type CSISecrets map[string]string

// CSIVolume is used for serialization, see also nomad/structs/csi.go
type CSIVolume struct {
ID string
Name string
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string
Topologies []*CSITopology
ID string
Name string
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string

// RequestedTopologies are the topologies submitted as options to
// the storage provider at the time the volume was created. After
// volumes are created, this field is ignored.
RequestedTopologies *CSITopologyRequest `hcl:"topology_request"`

// Topologies are the topologies returned by the storage provider,
// based on the RequestedTopologies and what the storage provider
// could support. This value cannot be set by the user.
Topologies []*CSITopology

AccessMode CSIVolumeAccessMode `hcl:"access_mode"`
AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"`
MountOptions *CSIMountOptions `hcl:"mount_options"`
Expand Down
7 changes: 6 additions & 1 deletion api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,13 @@ type NodeReservedNetworkResources struct {
ReservedHostPorts string
}

type CSITopologyRequest struct {
Required []*CSITopology `hcl:"required"`
Preferred []*CSITopology `hcl:"preferred"`
}

type CSITopology struct {
Segments map[string]string
Segments map[string]string `hcl:"segments"`
}

// CSINodeInfo is the fingerprinted data from a CSI Plugin that is specific to
Expand Down
6 changes: 6 additions & 0 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext

resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology))
for _, topo := range cresp.Volume.AccessibleTopology {
resp.Topologies = append(resp.Topologies,
&nstructs.CSITopology{Segments: topo.Segments})
}

return nil
}

Expand Down
46 changes: 30 additions & 16 deletions client/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,16 @@ type ClientCSIControllerDetachVolumeResponse struct{}
// Nomad client to tell a CSI controller plugin on that client to perform
// CreateVolume
type ClientCSIControllerCreateVolumeRequest struct {
Name string
VolumeCapabilities []*structs.CSIVolumeCapability
MountOptions *structs.CSIMountOptions
Parameters map[string]string
Secrets structs.CSISecrets
CapacityMin int64
CapacityMax int64
SnapshotID string
CloneID string
// TODO: topology is not yet supported
// TopologyRequirement
Name string
VolumeCapabilities []*structs.CSIVolumeCapability
MountOptions *structs.CSIMountOptions
Parameters map[string]string
Secrets structs.CSISecrets
CapacityMin int64
CapacityMax int64
SnapshotID string
CloneID string
RequestedTopologies *structs.CSITopologyRequest

CSIControllerQuery
}
Expand All @@ -237,8 +236,10 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
CloneID: req.CloneID,
SnapshotID: req.SnapshotID,
},
// TODO: topology is not yet supported
AccessibilityRequirements: &csi.TopologyRequirement{},
AccessibilityRequirements: &csi.TopologyRequirement{
Requisite: []*csi.Topology{},
Preferred: []*csi.Topology{},
},
}

// The CSI spec requires that at least one of the fields in CapacityRange
Expand All @@ -258,16 +259,29 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
}
creq.VolumeCapabilities = append(creq.VolumeCapabilities, ccap)
}

if req.RequestedTopologies != nil {
for _, topo := range req.RequestedTopologies.Required {
creq.AccessibilityRequirements.Requisite = append(
creq.AccessibilityRequirements.Requisite, &csi.Topology{
Segments: topo.Segments,
})
}
for _, topo := range req.RequestedTopologies.Preferred {
creq.AccessibilityRequirements.Preferred = append(
creq.AccessibilityRequirements.Preferred, &csi.Topology{
Segments: topo.Segments,
})
}
}
return creq, nil
}

type ClientCSIControllerCreateVolumeResponse struct {
ExternalVolumeID string
CapacityBytes int64
VolumeContext map[string]string

// TODO: topology is not yet supported
// AccessibleTopology []*Topology
Topologies []*structs.CSITopology
}

// ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a
Expand Down
15 changes: 12 additions & 3 deletions command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,13 @@ func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume {
ModifyIndex: vol.ModifyIndex,
}

if vol.RequestedTopologies != nil {
out.RequestedTopologies = &api.CSITopologyRequest{
Preferred: structsCSITopolgiesToApi(vol.RequestedTopologies.Preferred),
Required: structsCSITopolgiesToApi(vol.RequestedTopologies.Required),
}
}

// WriteAllocs and ReadAllocs will only ever contain the Allocation ID,
// with a null value for the Allocation; these IDs are mapped to
// allocation stubs in the Allocations field. This indirection is so the
Expand Down Expand Up @@ -717,9 +724,11 @@ func structsTaskEventToApi(te *structs.TaskEvent) *api.TaskEvent {
func structsCSITopolgiesToApi(tops []*structs.CSITopology) []*api.CSITopology {
out := make([]*api.CSITopology, 0, len(tops))
for _, t := range tops {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
if t != nil {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
}
}

return out
Expand Down
27 changes: 27 additions & 0 deletions command/plugin_status_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, erro
full = append(full, c.Colorize().Color("\n[bold]Node Capabilities[reset]"))
full = append(full, nodeCaps)
}
topos := c.formatTopology(plug.Nodes)
if topos != "" {
full = append(full, c.Colorize().Color("\n[bold]Accessible Topologies[reset]"))
full = append(full, topos)
}

}

// Format the allocs
Expand Down Expand Up @@ -183,6 +189,9 @@ func (c *PluginStatusCommand) formatControllerCaps(controllers map[string]*api.C
func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) string {
caps := []string{}
for _, node := range nodes {
if node.RequiresTopologies {
caps = append(caps, "VOLUME_ACCESSIBILITY_CONSTRAINTS")
}
switch info := node.NodeInfo; {
case info.RequiresNodeStageVolume:
caps = append(caps, "STAGE_UNSTAGE_VOLUME")
Expand All @@ -207,3 +216,21 @@ func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) stri

return " " + strings.Join(sort.StringSlice(caps), "\n ")
}

func (c *PluginStatusCommand) formatTopology(nodes map[string]*api.CSIInfo) string {
rows := []string{"Node ID|Accessible Topology"}
for nodeID, node := range nodes {
if node.NodeInfo.AccessibleTopology != nil {
segments := node.NodeInfo.AccessibleTopology.Segments
segmentPairs := make([]string, 0, len(segments))
for k, v := range segments {
segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v))
}
rows = append(rows, fmt.Sprintf("%s|%s", nodeID[:8], strings.Join(segmentPairs, ",")))
}
}
if len(rows) == 1 {
return ""
}
return formatList(rows)
}
42 changes: 41 additions & 1 deletion command/volume_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ capacity_max = "20G"
# capabilities to validate. Registering an existing volume will record but
# ignore these fields.
capability {
access_mode = "single-node-writer"
access_mode = "single-node-writer"
attachment_mode = "file-system"
}

Expand All @@ -150,6 +150,18 @@ mount_options {
mount_flags = ["ro"]
}

# Optional: specify one or more locations where the volume must be accessible
# from. Refer to the plugin documentation for what segment values are supported.
topology_request {
preferred {
topology { segments { rack = "R1" } }
}
required {
topology { segments { rack = "R1" } }
topology { segments { rack = "R2", zone = "us-east-1a" } }
}
}

# Optional: provide any secrets specified by the plugin.
secrets {
example_secret = "xyzzy"
Expand Down Expand Up @@ -201,6 +213,34 @@ var defaultJsonVolumeSpec = strings.TrimSpace(`
]
}
],
"topology_request": {
"preferred": [
{
"topology": {
"segments": {
"rack": "R1"
}
}
}
],
"required": [
{
"topology": {
"segments": {
"rack": "R1"
}
}
},
{
"topology": {
"segments": {
"rack": "R2",
"zone": "us-east-1a"
}
}
}
]
},
"parameters": [
{
"skuname": "Premium_LRS"
Expand Down
43 changes: 43 additions & 0 deletions command/volume_register_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
delete(m, "mount_options")
delete(m, "capacity_max")
delete(m, "capacity_min")
delete(m, "topology_request")
delete(m, "type")

// Decode the rest
Expand Down Expand Up @@ -116,6 +117,48 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
}
}

requestedTopos := list.Filter("topology_request")
if len(requestedTopos.Items) > 0 {

vol.RequestedTopologies = &api.CSITopologyRequest{}

for _, o := range requestedTopos.Elem().Items {
if err := helper.CheckHCLKeys(o.Val, []string{"preferred", "required"}); err != nil {
return nil, err
}
ot, ok := o.Val.(*ast.ObjectType)
if !ok {
break
}

// topology_request -> required|preferred -> []topology -> []segments (kv)
decoded := map[string][]map[string][]map[string][]map[string]string{}
tgross marked this conversation as resolved.
Show resolved Hide resolved
if err := hcl.DecodeObject(&decoded, ot.List); err != nil {
return nil, err
}

getTopologies := func(topKey string) []*api.CSITopology {
for _, topo := range decoded[topKey] {
var topos []*api.CSITopology
for _, segments := range topo["topology"] {
for _, segment := range segments["segments"] {
if len(segment) > 0 {
topos = append(topos, &api.CSITopology{Segments: segment})
}
}
}
if len(topos) > 0 {
return topos
}
}
return nil
}

vol.RequestedTopologies.Required = getTopologies("required")
vol.RequestedTopologies.Preferred = getTopologies("preferred")
}
}

return vol, nil
}

Expand Down
41 changes: 41 additions & 0 deletions command/volume_register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ capability {
access_mode = "single-node-reader-only"
attachment_mode = "block-device"
}

topology_request {
preferred {
topology { segments {rack = "R1"} }
}

required {
topology { segments {rack = "R1"} }
topology { segments {rack = "R2", zone = "us-east-1a"} }
}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
Expand All @@ -108,6 +119,16 @@ capability {
},
Parameters: map[string]string{"skuname": "Premium_LRS"},
Secrets: map[string]string{"password": "xyzzy"},
RequestedTopologies: &api.CSITopologyRequest{
Required: []*api.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
Preferred: []*api.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
},
Topologies: nil, // this is left empty
},
err: "",
}, {
Expand All @@ -124,6 +145,19 @@ capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}

topology_request {
# make sure we safely handle empty blocks even
# if they're invalid
preferred {
topology {}
topology { segments {} }
}

required {
topology { segments { rack = "R2", zone = "us-east-1a"} }
}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
Expand All @@ -136,6 +170,13 @@ capability {
AttachmentMode: api.CSIVolumeAttachmentModeFilesystem,
},
},
RequestedTopologies: &api.CSITopologyRequest{
Required: []*api.CSITopology{
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
Preferred: nil,
},
Topologies: nil,
},
err: "",
},
Expand Down
Loading