Skip to content

Commit

Permalink
CSI: implement support for topology
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Feb 24, 2022
1 parent 21aa764 commit 1ba340b
Show file tree
Hide file tree
Showing 24 changed files with 356 additions and 53 deletions.
2 changes: 1 addition & 1 deletion api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type CSIVolume struct {
Name string
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string
Topologies []*CSITopology
Topologies []*CSITopology `hcl:"required_topology"`
AccessMode CSIVolumeAccessMode `hcl:"access_mode"`
AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"`
MountOptions *CSIMountOptions `hcl:"mount_options"`
Expand Down
6 changes: 6 additions & 0 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext

resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology))
for _, topo := range cresp.Volume.AccessibleTopology {
resp.Topologies = append(resp.Topologies,
&nstructs.CSITopology{Segments: topo.Segments})
}

return nil
}

Expand Down
20 changes: 13 additions & 7 deletions client/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ type ClientCSIControllerCreateVolumeRequest struct {
CapacityMax int64
SnapshotID string
CloneID string
// TODO: topology is not yet supported
// TopologyRequirement
Topologies []*structs.CSITopology

CSIControllerQuery
}
Expand All @@ -237,8 +236,9 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
CloneID: req.CloneID,
SnapshotID: req.SnapshotID,
},
// TODO: topology is not yet supported
AccessibilityRequirements: &csi.TopologyRequirement{},
AccessibilityRequirements: &csi.TopologyRequirement{
Requisite: []*csi.Topology{},
},
}

// The CSI spec requires that at least one of the fields in CapacityRange
Expand All @@ -258,16 +258,22 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
}
creq.VolumeCapabilities = append(creq.VolumeCapabilities, ccap)
}

for _, topo := range req.Topologies {
creq.AccessibilityRequirements.Requisite = append(
creq.AccessibilityRequirements.Requisite, &csi.Topology{
Segments: topo.Segments,
})
}

return creq, nil
}

type ClientCSIControllerCreateVolumeResponse struct {
ExternalVolumeID string
CapacityBytes int64
VolumeContext map[string]string

// TODO: topology is not yet supported
// AccessibleTopology []*Topology
Topologies []*structs.CSITopology
}

// ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a
Expand Down
33 changes: 32 additions & 1 deletion command/plugin_status_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, erro
full = append(full, c.Colorize().Color("\n[bold]Node Capabilities[reset]"))
full = append(full, nodeCaps)
}
topos := c.formatTopology(plug.Nodes)
if topos != "" {
full = append(full, c.Colorize().Color("\n[bold]Accessible Topologies[reset]"))
full = append(full, topos)
}

}

// Format the allocs
Expand Down Expand Up @@ -177,12 +183,16 @@ func (c *PluginStatusCommand) formatControllerCaps(controllers map[string]*api.C
return ""
}

return strings.Join(caps, "\n\t")
return " " + strings.Join(caps, "\n ")
}

func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) string {
caps := []string{}
for _, node := range nodes {
// TODO: move this up to top-level k/v?
if node.RequiresTopologies {
caps = append(caps, "VOLUME_ACCESSIBILITY_CONSTRAINTS")
}
switch info := node.NodeInfo; {
case info.RequiresNodeStageVolume:
caps = append(caps, "STAGE_UNSTAGE_VOLUME")
Expand All @@ -207,3 +217,24 @@ func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) stri

return " " + strings.Join(caps, "\n ")
}

func (c *PluginStatusCommand) formatTopology(nodes map[string]*api.CSIInfo) string {
topos := []string{}
for nodeID, node := range nodes {
if node.NodeInfo.AccessibleTopology != nil {
segments := node.NodeInfo.AccessibleTopology.Segments
segmentPairs := make([]string, 0, len(segments))
for k, v := range segments {
segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v))
}
topos = append(topos, fmt.Sprintf("Node %s: %v",
nodeID[:8], strings.Join(segmentPairs, ",")))
}
}

if len(topos) == 0 {
return ""
}

return " " + strings.Join(topos, "\n ")
}
18 changes: 18 additions & 0 deletions command/volume_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ mount_options {
mount_flags = ["ro"]
}
# Optional: specify one or more locations where the volume must be accessible
# from. Refer to the plugin documentation for what segment values are supported.
required_topology {
segments {rack = "R1", zone = "us-east-1a"}
}
required_topology {
segments {rack = "R2", zone = "us-east-1b"}
}
# Optional: provide any secrets specified by the plugin.
secrets {
example_secret = "xyzzy"
Expand Down Expand Up @@ -201,6 +211,14 @@ var defaultJsonVolumeSpec = strings.TrimSpace(`
]
}
],
"require_topologies": [
{
"segments": {"rack": "R1", "zone": "us-east-1a"}
},
{
"segments": {"rack": "R2", "zone": "us-east-1b"}
}
]
"parameters": [
{
"skuname": "Premium_LRS"
Expand Down
27 changes: 27 additions & 0 deletions command/volume_register_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
delete(m, "mount_options")
delete(m, "capacity_max")
delete(m, "capacity_min")
delete(m, "required_topology")
delete(m, "type")

// Decode the rest
Expand Down Expand Up @@ -116,6 +117,32 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
}
}

requiredTopos := list.Filter("required_topology")
if len(requiredTopos.Items) > 0 {

for _, o := range requiredTopos.Elem().Items {
if err := helper.CheckHCLKeys(o.Val, []string{"segments"}); err != nil {
return nil, err
}

ot, ok := o.Val.(*ast.ObjectType)
if !ok {
break
}

var m map[string]interface{}
if err := hcl.DecodeObject(&m, ot.List); err != nil {
return nil, err
}
var topo *api.CSITopology
if err := mapstructure.WeakDecode(&m, &topo); err != nil {
return nil, err
}

vol.Topologies = append(vol.Topologies, topo)
}
}

return vol, nil
}

Expand Down
23 changes: 23 additions & 0 deletions command/volume_register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ capability {
access_mode = "single-node-reader-only"
attachment_mode = "block-device"
}
required_topology {
segments {rack = "R1"}
}
required_topology {
segments {rack = "R2", zone = "us-east-1a"}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
Expand All @@ -108,6 +116,10 @@ capability {
},
Parameters: map[string]string{"skuname": "Premium_LRS"},
Secrets: map[string]string{"password": "xyzzy"},
Topologies: []*api.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
},
err: "",
}, {
Expand All @@ -124,6 +136,14 @@ capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}
# make sure we safely handle empty blocks even
# if they're invalid
require_topology {}
required_topology {
segments {rack = "R2", zone = "us-east-1a"}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
Expand All @@ -136,6 +156,9 @@ capability {
AttachmentMode: api.CSIVolumeAttachmentModeFilesystem,
},
},
Topologies: []*api.CSITopology{
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
},
err: "",
},
Expand Down
13 changes: 12 additions & 1 deletion command/volume_status_csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,21 @@ func (c *VolumeStatusCommand) formatBasic(vol *api.CSIVolume) (string, error) {
return formatKV(output), nil
}

full := []string{formatKV(output)}

if len(vol.Topologies) > 0 {
topoBanner := c.Colorize().Color("\n[bold]Topologies[reset]")
topo := c.formatTopologies(vol)
full = append(full, topoBanner)
full = append(full, topo)
}

// Format the allocs
banner := c.Colorize().Color("\n[bold]Allocations[reset]")
allocs := formatAllocListStubs(vol.Allocations, c.verbose, c.length)
full := []string{formatKV(output), banner, allocs}
full = append(full, banner)
full = append(full, allocs)

return strings.Join(full, "\n"), nil
}

Expand Down
7 changes: 7 additions & 0 deletions e2e/csi/input/ebs-volume0.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@ capability {
parameters {
type = "gp2"
}

required_topology {
segments {
# this zone should match the one set in e2e/terraform/variables.tf
"topology.ebs.csi.aws.com/zone" = "us-east-1b"
}
}
7 changes: 7 additions & 0 deletions e2e/csi/input/ebs-volume1.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@ capability {
parameters {
type = "gp2"
}

required_topology {
segments {
# this zone should match the one set in e2e/terraform/variables.tf
"topology.ebs.csi.aws.com/zone" = "us-east-1b"
}
}
2 changes: 1 addition & 1 deletion e2e/csi/input/plugin-aws-ebs-controller.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ job "plugin-aws-ebs-controller" {
driver = "docker"

config {
image = "amazon/aws-ebs-csi-driver:v0.9.0"
image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1"

args = [
"controller",
Expand Down
2 changes: 1 addition & 1 deletion e2e/csi/input/plugin-aws-ebs-nodes.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ job "plugin-aws-ebs-nodes" {
driver = "docker"

config {
image = "amazon/aws-ebs-csi-driver:v0.9.0"
image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1"

args = [
"node",
Expand Down
2 changes: 1 addition & 1 deletion e2e/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ variable "region" {

variable "availability_zone" {
description = "The AWS availability zone to deploy to."
default = "us-east-1a"
default = "us-east-1b"
}

variable "instance_type" {
Expand Down
2 changes: 2 additions & 0 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,7 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
CapacityMax: vol.RequestedCapacityMax,
SnapshotID: vol.SnapshotID,
CloneID: vol.CloneID,
Topologies: vol.Topologies,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerCreateVolumeResponse{}
Expand All @@ -918,6 +919,7 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
vol.ExternalID = cResp.ExternalVolumeID
vol.Capacity = cResp.CapacityBytes
vol.Context = cResp.VolumeContext
vol.Topologies = cResp.Topologies
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,9 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
ExternalVolumeID: "vol-12345",
CapacityBytes: 42,
VolumeContext: map[string]string{"plugincontext": "bar"},
Topologies: []*structs.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
}

client, cleanup := client.TestClientWithRPCs(t,
Expand Down Expand Up @@ -829,6 +832,10 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
},
Topologies: []*structs.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"zone": "Z2"}},
},
}}

// Create the create request
Expand Down Expand Up @@ -872,6 +879,7 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
require.Equal(t, int64(42), vol.Capacity)
require.Equal(t, "bar", vol.Context["plugincontext"])
require.Equal(t, "", vol.Context["mycontext"])
require.Equal(t, map[string]string{"rack": "R1"}, vol.Topologies[0].Segments)
}

func TestCSIVolumeEndpoint_Delete(t *testing.T) {
Expand Down
Loading

0 comments on commit 1ba340b

Please sign in to comment.