diff --git a/.changelog/12129.txt b/.changelog/12129.txt new file mode 100644 index 000000000000..246899e83433 --- /dev/null +++ b/.changelog/12129.txt @@ -0,0 +1,3 @@ +```release-note:improvement +csi: Added support for storage topology +``` diff --git a/api/csi.go b/api/csi.go index 316841f25081..1950e4ba7ab4 100644 --- a/api/csi.go +++ b/api/csi.go @@ -252,11 +252,21 @@ func (w *WriteOptions) SetHeadersFromCSISecrets(secrets CSISecrets) { // CSIVolume is used for serialization, see also nomad/structs/csi.go type CSIVolume struct { - ID string - Name string - ExternalID string `mapstructure:"external_id" hcl:"external_id"` - Namespace string - Topologies []*CSITopology + ID string + Name string + ExternalID string `mapstructure:"external_id" hcl:"external_id"` + Namespace string + + // RequestedTopologies are the topologies submitted as options to + // the storage provider at the time the volume was created. After + // volumes are created, this field is ignored. + RequestedTopologies *CSITopologyRequest `hcl:"topology_request"` + + // Topologies are the topologies returned by the storage provider, + // based on the RequestedTopologies and what the storage provider + // could support. This value cannot be set by the user. + Topologies []*CSITopology + AccessMode CSIVolumeAccessMode `hcl:"access_mode"` AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"` MountOptions *CSIMountOptions `hcl:"mount_options"` diff --git a/api/nodes.go b/api/nodes.go index 3fd24d81a9cd..0a9ed2833abe 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -614,8 +614,13 @@ type NodeReservedNetworkResources struct { ReservedHostPorts string } +type CSITopologyRequest struct { + Required []*CSITopology `hcl:"required"` + Preferred []*CSITopology `hcl:"preferred"` +} + type CSITopology struct { - Segments map[string]string + Segments map[string]string `hcl:"segments"` } // CSINodeInfo is the fingerprinted data from a CSI Plugin that is specific to diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index 029c91ede62a..2438edce0a1b 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -216,6 +216,12 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum resp.CapacityBytes = cresp.Volume.CapacityBytes resp.VolumeContext = cresp.Volume.VolumeContext + resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology)) + for _, topo := range cresp.Volume.AccessibleTopology { + resp.Topologies = append(resp.Topologies, + &nstructs.CSITopology{Segments: topo.Segments}) + } + return nil } diff --git a/client/structs/csi.go b/client/structs/csi.go index fb6ca6c7aeee..642f02b013f2 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -211,17 +211,16 @@ type ClientCSIControllerDetachVolumeResponse struct{} // Nomad client to tell a CSI controller plugin on that client to perform // CreateVolume type ClientCSIControllerCreateVolumeRequest struct { - Name string - VolumeCapabilities []*structs.CSIVolumeCapability - MountOptions *structs.CSIMountOptions - Parameters map[string]string - Secrets structs.CSISecrets - CapacityMin int64 - CapacityMax int64 - SnapshotID string - CloneID string - // TODO: topology is not yet supported - // TopologyRequirement + Name string + VolumeCapabilities []*structs.CSIVolumeCapability + MountOptions *structs.CSIMountOptions + Parameters map[string]string + Secrets structs.CSISecrets + CapacityMin int64 + CapacityMax int64 + SnapshotID string + CloneID string + RequestedTopologies *structs.CSITopologyRequest CSIControllerQuery } @@ -237,8 +236,10 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll CloneID: req.CloneID, SnapshotID: req.SnapshotID, }, - // TODO: topology is not yet supported - AccessibilityRequirements: &csi.TopologyRequirement{}, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{}, + Preferred: []*csi.Topology{}, + }, } // The CSI spec requires that at least one of the fields in CapacityRange @@ -258,6 +259,21 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll } creq.VolumeCapabilities = append(creq.VolumeCapabilities, ccap) } + + if req.RequestedTopologies != nil { + for _, topo := range req.RequestedTopologies.Required { + creq.AccessibilityRequirements.Requisite = append( + creq.AccessibilityRequirements.Requisite, &csi.Topology{ + Segments: topo.Segments, + }) + } + for _, topo := range req.RequestedTopologies.Preferred { + creq.AccessibilityRequirements.Preferred = append( + creq.AccessibilityRequirements.Preferred, &csi.Topology{ + Segments: topo.Segments, + }) + } + } return creq, nil } @@ -265,9 +281,7 @@ type ClientCSIControllerCreateVolumeResponse struct { ExternalVolumeID string CapacityBytes int64 VolumeContext map[string]string - - // TODO: topology is not yet supported - // AccessibleTopology []*Topology + Topologies []*structs.CSITopology } // ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a diff --git a/command/agent/csi_endpoint.go b/command/agent/csi_endpoint.go index b23101fba555..4e0a8ea08f5e 100644 --- a/command/agent/csi_endpoint.go +++ b/command/agent/csi_endpoint.go @@ -508,6 +508,13 @@ func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume { ModifyIndex: vol.ModifyIndex, } + if vol.RequestedTopologies != nil { + out.RequestedTopologies = &api.CSITopologyRequest{ + Preferred: structsCSITopolgiesToApi(vol.RequestedTopologies.Preferred), + Required: structsCSITopolgiesToApi(vol.RequestedTopologies.Required), + } + } + // WriteAllocs and ReadAllocs will only ever contain the Allocation ID, // with a null value for the Allocation; these IDs are mapped to // allocation stubs in the Allocations field. This indirection is so the @@ -725,9 +732,11 @@ func structsTaskEventToApi(te *structs.TaskEvent) *api.TaskEvent { func structsCSITopolgiesToApi(tops []*structs.CSITopology) []*api.CSITopology { out := make([]*api.CSITopology, 0, len(tops)) for _, t := range tops { - out = append(out, &api.CSITopology{ - Segments: t.Segments, - }) + if t != nil { + out = append(out, &api.CSITopology{ + Segments: t.Segments, + }) + } } return out diff --git a/command/plugin_status_csi.go b/command/plugin_status_csi.go index 3ef4d0ee8716..b3a6ac36b39f 100644 --- a/command/plugin_status_csi.go +++ b/command/plugin_status_csi.go @@ -118,6 +118,12 @@ func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, erro full = append(full, c.Colorize().Color("\n[bold]Node Capabilities[reset]")) full = append(full, nodeCaps) } + topos := c.formatTopology(plug.Nodes) + if topos != "" { + full = append(full, c.Colorize().Color("\n[bold]Accessible Topologies[reset]")) + full = append(full, topos) + } + } // Format the allocs @@ -183,6 +189,9 @@ func (c *PluginStatusCommand) formatControllerCaps(controllers map[string]*api.C func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) string { caps := []string{} for _, node := range nodes { + if node.RequiresTopologies { + caps = append(caps, "VOLUME_ACCESSIBILITY_CONSTRAINTS") + } switch info := node.NodeInfo; { case info.RequiresNodeStageVolume: caps = append(caps, "STAGE_UNSTAGE_VOLUME") @@ -207,3 +216,21 @@ func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) stri return " " + strings.Join(sort.StringSlice(caps), "\n ") } + +func (c *PluginStatusCommand) formatTopology(nodes map[string]*api.CSIInfo) string { + rows := []string{"Node ID|Accessible Topology"} + for nodeID, node := range nodes { + if node.NodeInfo.AccessibleTopology != nil { + segments := node.NodeInfo.AccessibleTopology.Segments + segmentPairs := make([]string, 0, len(segments)) + for k, v := range segments { + segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v)) + } + rows = append(rows, fmt.Sprintf("%s|%s", nodeID[:8], strings.Join(segmentPairs, ","))) + } + } + if len(rows) == 1 { + return "" + } + return formatList(rows) +} diff --git a/command/volume_init.go b/command/volume_init.go index c1714f7bf345..ce0ffba3c31d 100644 --- a/command/volume_init.go +++ b/command/volume_init.go @@ -133,7 +133,7 @@ capacity_max = "20G" # capabilities to validate. Registering an existing volume will record but # ignore these fields. capability { - access_mode = "single-node-writer" + access_mode = "single-node-writer" attachment_mode = "file-system" } @@ -150,6 +150,18 @@ mount_options { mount_flags = ["ro"] } +# Optional: specify one or more locations where the volume must be accessible +# from. Refer to the plugin documentation for what segment values are supported. +topology_request { + preferred { + topology { segments { rack = "R1" } } + } + required { + topology { segments { rack = "R1" } } + topology { segments { rack = "R2", zone = "us-east-1a" } } + } +} + # Optional: provide any secrets specified by the plugin. secrets { example_secret = "xyzzy" @@ -201,6 +213,34 @@ var defaultJsonVolumeSpec = strings.TrimSpace(` ] } ], + "topology_request": { + "preferred": [ + { + "topology": { + "segments": { + "rack": "R1" + } + } + } + ], + "required": [ + { + "topology": { + "segments": { + "rack": "R1" + } + } + }, + { + "topology": { + "segments": { + "rack": "R2", + "zone": "us-east-1a" + } + } + } + ] + }, "parameters": [ { "skuname": "Premium_LRS" diff --git a/command/volume_register_csi.go b/command/volume_register_csi.go index aa68d63516e6..b3cf9e2fe626 100644 --- a/command/volume_register_csi.go +++ b/command/volume_register_csi.go @@ -48,6 +48,7 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) { delete(m, "mount_options") delete(m, "capacity_max") delete(m, "capacity_min") + delete(m, "topology_request") delete(m, "type") // Decode the rest @@ -116,6 +117,48 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) { } } + requestedTopos := list.Filter("topology_request") + if len(requestedTopos.Items) > 0 { + + vol.RequestedTopologies = &api.CSITopologyRequest{} + + for _, o := range requestedTopos.Elem().Items { + if err := helper.CheckHCLKeys(o.Val, []string{"preferred", "required"}); err != nil { + return nil, err + } + ot, ok := o.Val.(*ast.ObjectType) + if !ok { + break + } + + // topology_request -> required|preferred -> []topology -> []segments (kv) + decoded := map[string][]map[string][]map[string][]map[string]string{} + if err := hcl.DecodeObject(&decoded, ot.List); err != nil { + return nil, err + } + + getTopologies := func(topKey string) []*api.CSITopology { + for _, topo := range decoded[topKey] { + var topos []*api.CSITopology + for _, segments := range topo["topology"] { + for _, segment := range segments["segments"] { + if len(segment) > 0 { + topos = append(topos, &api.CSITopology{Segments: segment}) + } + } + } + if len(topos) > 0 { + return topos + } + } + return nil + } + + vol.RequestedTopologies.Required = getTopologies("required") + vol.RequestedTopologies.Preferred = getTopologies("preferred") + } + } + return vol, nil } diff --git a/command/volume_register_test.go b/command/volume_register_test.go index b69816bfae6b..b65b923cd547 100644 --- a/command/volume_register_test.go +++ b/command/volume_register_test.go @@ -84,6 +84,17 @@ capability { access_mode = "single-node-reader-only" attachment_mode = "block-device" } + +topology_request { + preferred { + topology { segments {rack = "R1"} } + } + + required { + topology { segments {rack = "R1"} } + topology { segments {rack = "R2", zone = "us-east-1a"} } + } +} `, expected: &api.CSIVolume{ ID: "testvolume", @@ -108,6 +119,16 @@ capability { }, Parameters: map[string]string{"skuname": "Premium_LRS"}, Secrets: map[string]string{"password": "xyzzy"}, + RequestedTopologies: &api.CSITopologyRequest{ + Required: []*api.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}}, + }, + Preferred: []*api.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, + }, + Topologies: nil, // this is left empty }, err: "", }, { @@ -124,6 +145,19 @@ capability { access_mode = "single-node-writer" attachment_mode = "file-system" } + +topology_request { + # make sure we safely handle empty blocks even + # if they're invalid + preferred { + topology {} + topology { segments {} } + } + + required { + topology { segments { rack = "R2", zone = "us-east-1a"} } + } +} `, expected: &api.CSIVolume{ ID: "testvolume", @@ -136,6 +170,13 @@ capability { AttachmentMode: api.CSIVolumeAttachmentModeFilesystem, }, }, + RequestedTopologies: &api.CSITopologyRequest{ + Required: []*api.CSITopology{ + {Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}}, + }, + Preferred: nil, + }, + Topologies: nil, }, err: "", }, diff --git a/command/volume_status_csi.go b/command/volume_status_csi.go index 36724d34abf7..84a4abb096a1 100644 --- a/command/volume_status_csi.go +++ b/command/volume_status_csi.go @@ -212,43 +212,42 @@ func (c *VolumeStatusCommand) formatBasic(vol *api.CSIVolume) (string, error) { return formatKV(output), nil } + full := []string{formatKV(output)} + + if len(vol.Topologies) > 0 { + topoBanner := c.Colorize().Color("\n[bold]Topologies[reset]") + topo := c.formatTopology(vol) + full = append(full, topoBanner) + full = append(full, topo) + } + // Format the allocs banner := c.Colorize().Color("\n[bold]Allocations[reset]") allocs := formatAllocListStubs(vol.Allocations, c.verbose, c.length) - full := []string{formatKV(output), banner, allocs} + full = append(full, banner) + full = append(full, allocs) + return strings.Join(full, "\n"), nil } -func (c *VolumeStatusCommand) formatTopologies(vol *api.CSIVolume) string { - var out []string - - // Find the union of all the keys - head := map[string]string{} - for _, t := range vol.Topologies { - for key := range t.Segments { - if _, ok := head[key]; !ok { - head[key] = "" - } +func (c *VolumeStatusCommand) formatTopology(vol *api.CSIVolume) string { + rows := []string{"Topology|Segments"} + for i, t := range vol.Topologies { + segmentPairs := make([]string, 0, len(t.Segments)) + for k, v := range t.Segments { + segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v)) } + // note: this looks awkward because we don't have any other + // place where we list collections of arbitrary k/v's like + // this without just dumping JSON formatted outputs. It's likely + // the spec will expand to add extra fields, in which case we'll + // add them here and drop the first column + rows = append(rows, fmt.Sprintf("%02d|%v", i, strings.Join(segmentPairs, ", "))) } - - // Append the header - var line []string - for key := range head { - line = append(line, key) + if len(rows) == 1 { + return "" } - out = append(out, strings.Join(line, " ")) - - // Append each topology - for _, t := range vol.Topologies { - line = []string{} - for key := range head { - line = append(line, t.Segments[key]) - } - out = append(out, strings.Join(line, " ")) - } - - return strings.Join(out, "\n") + return formatList(rows) } func csiVolMountOption(volume, request *api.CSIMountOptions) string { diff --git a/e2e/csi/csi.go b/e2e/csi/csi.go index d500b6d2c069..e570f7383b89 100644 --- a/e2e/csi/csi.go +++ b/e2e/csi/csi.go @@ -226,9 +226,13 @@ func volumeRegister(volID, volFilePath, createOrRegister string) error { } // hack off the first line to replace with our unique ID - var re = regexp.MustCompile(`(?m)^id ".*"`) - volspec := re.ReplaceAllString(string(content), - fmt.Sprintf("id = \"%s\"", volID)) + var idRegex = regexp.MustCompile(`(?m)^id ".*"`) + volspec := idRegex.ReplaceAllString(string(content), + fmt.Sprintf("id = %q", volID)) + + var nameRegex = regexp.MustCompile(`(?m)^name ".*"`) + volspec = nameRegex.ReplaceAllString(volspec, + fmt.Sprintf("name = %q", volID)) go func() { defer stdin.Close() diff --git a/e2e/csi/input/ebs-volume0.hcl b/e2e/csi/input/ebs-volume0.hcl index bf961efedd67..b3e8fd93ddca 100644 --- a/e2e/csi/input/ebs-volume0.hcl +++ b/e2e/csi/input/ebs-volume0.hcl @@ -1,5 +1,5 @@ id = "ebs-vol[0]" -name = "this-is-a-test-0" # CSIVolumeName tag +name = "idempotency-token" # CSIVolumeName tag, must be idempotent type = "csi" plugin_id = "aws-ebs0" @@ -19,3 +19,14 @@ capability { parameters { type = "gp2" } + +topology_request { + required { + topology { + segments { + # this zone should match the one set in e2e/terraform/variables.tf + "topology.ebs.csi.aws.com/zone" = "us-east-1b" + } + } + } +} diff --git a/e2e/csi/input/ebs-volume1.hcl b/e2e/csi/input/ebs-volume1.hcl index df38b9034727..57f715a78dbd 100644 --- a/e2e/csi/input/ebs-volume1.hcl +++ b/e2e/csi/input/ebs-volume1.hcl @@ -1,5 +1,5 @@ id = "ebs-vol[1]" -name = "this-is-a-test-1" # CSIVolumeName tag +name = "idempotency-token" # CSIVolumeName tag type = "csi" plugin_id = "aws-ebs0" @@ -19,3 +19,14 @@ capability { parameters { type = "gp2" } + +topology_request { + required { + topology { + segments { + # this zone should match the one set in e2e/terraform/variables.tf + "topology.ebs.csi.aws.com/zone" = "us-east-1b" + } + } + } +} diff --git a/e2e/csi/input/plugin-aws-ebs-controller.nomad b/e2e/csi/input/plugin-aws-ebs-controller.nomad index b4bd6d626ac3..dd0b675c79d2 100644 --- a/e2e/csi/input/plugin-aws-ebs-controller.nomad +++ b/e2e/csi/input/plugin-aws-ebs-controller.nomad @@ -22,7 +22,7 @@ job "plugin-aws-ebs-controller" { driver = "docker" config { - image = "amazon/aws-ebs-csi-driver:v0.9.0" + image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1" args = [ "controller", diff --git a/e2e/csi/input/plugin-aws-ebs-nodes.nomad b/e2e/csi/input/plugin-aws-ebs-nodes.nomad index 3411990a3168..206b1df81f0c 100644 --- a/e2e/csi/input/plugin-aws-ebs-nodes.nomad +++ b/e2e/csi/input/plugin-aws-ebs-nodes.nomad @@ -19,7 +19,7 @@ job "plugin-aws-ebs-nodes" { driver = "docker" config { - image = "amazon/aws-ebs-csi-driver:v0.9.0" + image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1" args = [ "node", diff --git a/e2e/terraform/variables.tf b/e2e/terraform/variables.tf index c48c6ef67f41..49f8f879057a 100644 --- a/e2e/terraform/variables.tf +++ b/e2e/terraform/variables.tf @@ -10,7 +10,7 @@ variable "region" { variable "availability_zone" { description = "The AWS availability zone to deploy to." - default = "us-east-1a" + default = "us-east-1b" } variable "instance_type" { diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index ac63b8fa5659..092fc16ffd0a 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -301,6 +301,14 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru if err := v.controllerValidateVolume(args, vol, plugin); err != nil { return err } + + // The topologies for the volume have already been set when it was + // created, so we accept the user's description of that topology + if vol.Topologies == nil || len(vol.Topologies) == 0 { + if vol.RequestedTopologies != nil { + vol.Topologies = vol.RequestedTopologies.Required + } + } } resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args) @@ -898,15 +906,16 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug method := "ClientCSI.ControllerCreateVolume" cReq := &cstructs.ClientCSIControllerCreateVolumeRequest{ - Name: vol.Name, - VolumeCapabilities: vol.RequestedCapabilities, - MountOptions: vol.MountOptions, - Parameters: vol.Parameters, - Secrets: vol.Secrets, - CapacityMin: vol.RequestedCapacityMin, - CapacityMax: vol.RequestedCapacityMax, - SnapshotID: vol.SnapshotID, - CloneID: vol.CloneID, + Name: vol.Name, + VolumeCapabilities: vol.RequestedCapabilities, + MountOptions: vol.MountOptions, + Parameters: vol.Parameters, + Secrets: vol.Secrets, + CapacityMin: vol.RequestedCapacityMin, + CapacityMax: vol.RequestedCapacityMax, + SnapshotID: vol.SnapshotID, + CloneID: vol.CloneID, + RequestedTopologies: vol.RequestedTopologies, } cReq.PluginID = plugin.ID cResp := &cstructs.ClientCSIControllerCreateVolumeResponse{} @@ -918,6 +927,7 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug vol.ExternalID = cResp.ExternalVolumeID vol.Capacity = cResp.CapacityBytes vol.Context = cResp.VolumeContext + vol.Topologies = cResp.Topologies return nil } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 0d06a85910ae..e4235ee213e8 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -275,9 +275,10 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) { ID: id0, Namespace: structs.DefaultNamespace, PluginID: "minnie", - Topologies: []*structs.CSITopology{{ - Segments: map[string]string{"foo": "bar"}, - }}, + RequestedTopologies: &structs.CSITopologyRequest{ + Required: []*structs.CSITopology{ + {Segments: map[string]string{"foo": "bar"}}}, + }, Secrets: structs.CSISecrets{"mysecret": "secretvalue"}, RequestedCapabilities: []*structs.CSIVolumeCapability{{ AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, @@ -754,6 +755,9 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) { ExternalVolumeID: "vol-12345", CapacityBytes: 42, VolumeContext: map[string]string{"plugincontext": "bar"}, + Topologies: []*structs.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, } client, cleanup := client.TestClientWithRPCs(t, @@ -829,6 +833,10 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }, }, + Topologies: []*structs.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"zone": "Z2"}}, + }, }} // Create the create request @@ -872,6 +880,7 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) { require.Equal(t, int64(42), vol.Capacity) require.Equal(t, "bar", vol.Context["plugincontext"]) require.Equal(t, "", vol.Context["mycontext"]) + require.Equal(t, map[string]string{"rack": "R1"}, vol.Topologies[0].Segments) } func TestCSIVolumeEndpoint_Delete(t *testing.T) { diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 9540d8445b26..3d7c5178fc5a 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -246,9 +246,19 @@ type CSIVolume struct { // Name is a display name for the volume, not required to be unique Name string // ExternalID identifies the volume for the CSI interface, may be URL unsafe - ExternalID string - Namespace string - Topologies []*CSITopology + ExternalID string + Namespace string + + // RequestedTopologies are the topologies submitted as options to + // the storage provider at the time the volume was created. After + // volumes are created, this field is ignored. + RequestedTopologies *CSITopologyRequest + + // Topologies are the topologies returned by the storage provider, + // based on the RequestedTopologies and what the storage provider + // could support. This value cannot be set by the user. + Topologies []*CSITopology + AccessMode CSIVolumeAccessMode // *current* access mode AttachmentMode CSIVolumeAttachmentMode // *current* attachment mode MountOptions *CSIMountOptions @@ -679,20 +689,18 @@ func (v *CSIVolume) Validate() error { if len(v.RequestedCapabilities) == 0 { errs = append(errs, "must include at least one capability block") } - - // TODO: Volume Topologies are optional - We should check to see if the plugin - // the volume is being registered with requires them. - // var ok bool - // for _, t := range v.Topologies { - // if t != nil && len(t.Segments) > 0 { - // ok = true - // break - // } - // } - // if !ok { - // errs = append(errs, "missing topology") - // } - + if v.RequestedTopologies != nil { + for _, t := range v.RequestedTopologies.Required { + if t != nil && len(t.Segments) == 0 { + errs = append(errs, "required topology is missing segments field") + } + } + for _, t := range v.RequestedTopologies.Preferred { + if t != nil && len(t.Segments) == 0 { + errs = append(errs, "preferred topology is missing segments field") + } + } + } if len(errs) > 0 { return fmt.Errorf("validation: %s", strings.Join(errs, ", ")) } @@ -836,9 +844,6 @@ type CSIVolumeExternalStub struct { CloneID string SnapshotID string - // TODO: topology support - // AccessibleTopology []*Topology - PublishedExternalNodeIDs []string IsAbnormal bool Status string diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 32d150816a19..855a65871b47 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -554,6 +554,21 @@ func TestVolume_Copy(t *testing.T) { } +func TestCSIVolume_Validate(t *testing.T) { + vol := &CSIVolume{ + ID: "test", + PluginID: "test", + SnapshotID: "test-snapshot", + CloneID: "test-clone", + RequestedTopologies: &CSITopologyRequest{ + Required: []*CSITopology{{}, {}}, + }, + } + err := vol.Validate() + require.EqualError(t, err, "validation: missing namespace, only one of snapshot_id and clone_id is allowed, must include at least one capability block, required topology is missing segments field, required topology is missing segments field") + +} + func TestCSIPluginJobs(t *testing.T) { plug := NewCSIPlugin("foo", 1000) controller := &Job{ diff --git a/nomad/structs/node.go b/nomad/structs/node.go index 71be7796f28f..6a8f9cd47a56 100644 --- a/nomad/structs/node.go +++ b/nomad/structs/node.go @@ -62,6 +62,14 @@ func (t *CSITopology) Equal(o *CSITopology) bool { return helper.CompareMapStringString(t.Segments, o.Segments) } +// CSITopologyRequest are the topologies submitted as options to the +// storage provider at the time the volume was created. The storage +// provider will return a single topology. +type CSITopologyRequest struct { + Required []*CSITopology + Preferred []*CSITopology +} + // CSINodeInfo is the fingerprinted data from a CSI Plugin that is specific to // the Node API. type CSINodeInfo struct { diff --git a/plugins/csi/client.go b/plugins/csi/client.go index 6d34d8f55aa2..8c5185bbefff 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -740,6 +740,11 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) result.MaxVolumes = math.MaxInt64 } + topo := resp.GetAccessibleTopology() + if topo != nil { + result.AccessibleTopology = &Topology{Segments: topo.Segments} + } + return result, nil } diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 1c951b2bac96..6554662c77d7 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -746,7 +746,7 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { }, { - Name: "handles success with capacity range and source", + Name: "handles success with capacity range, source, and topology", CapacityRange: &CapacityRange{ RequiredBytes: 500, LimitBytes: 1000, @@ -764,6 +764,9 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { }, }, }, + AccessibleTopology: []*csipbv1.Topology{ + {Segments: map[string]string{"rack": "R1"}}, + }, }, }, }, @@ -782,10 +785,19 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { AccessMode: VolumeAccessModeMultiNodeMultiWriter, }, }, - Parameters: map[string]string{}, - Secrets: structs.CSISecrets{}, - ContentSource: tc.ContentSource, - AccessibilityRequirements: &TopologyRequirement{}, + Parameters: map[string]string{}, + Secrets: structs.CSISecrets{}, + ContentSource: tc.ContentSource, + AccessibilityRequirements: &TopologyRequirement{ + Requisite: []*Topology{ + { + Segments: map[string]string{"rack": "R1"}, + }, + { + Segments: map[string]string{"rack": "R2"}, + }, + }, + }, } cc.NextCreateVolumeResponse = tc.Response @@ -808,6 +820,14 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { require.Equal(t, tc.ContentSource.CloneID, resp.Volume.ContentSource.CloneID) require.Equal(t, tc.ContentSource.SnapshotID, resp.Volume.ContentSource.SnapshotID) } + if tc.Response != nil && tc.Response.Volume != nil { + require.Len(t, resp.Volume.AccessibleTopology, 1) + require.Equal(t, + req.AccessibilityRequirements.Requisite[0].Segments, + resp.Volume.AccessibleTopology[0].Segments, + ) + } + }) } } diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index 54c664b8fe63..c03390688b69 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -581,10 +581,11 @@ type ControllerCreateVolumeResponse struct { func NewCreateVolumeResponse(resp *csipbv1.CreateVolumeResponse) *ControllerCreateVolumeResponse { vol := resp.GetVolume() return &ControllerCreateVolumeResponse{Volume: &Volume{ - CapacityBytes: vol.GetCapacityBytes(), - ExternalVolumeID: vol.GetVolumeId(), - VolumeContext: vol.GetVolumeContext(), - ContentSource: newVolumeContentSource(vol.GetContentSource()), + CapacityBytes: vol.GetCapacityBytes(), + ExternalVolumeID: vol.GetVolumeId(), + VolumeContext: vol.GetVolumeContext(), + ContentSource: newVolumeContentSource(vol.GetContentSource()), + AccessibleTopology: newTopologies(vol.GetAccessibleTopology()), }} } diff --git a/scheduler/feasible.go b/scheduler/feasible.go index fecd97286034..3c478eb695d6 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -27,6 +27,7 @@ const ( FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released" FilterConstraintDrivers = "missing drivers" FilterConstraintDevices = "missing devices" + FilterConstraintsCSIPluginTopology = "did not meet topology requirement" ) var ( @@ -313,6 +314,22 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) { return false, fmt.Sprintf(FilterConstraintCSIPluginMaxVolumesTemplate, vol.PluginID, n.ID) } + // CSI spec: "If requisite is specified, the provisioned + // volume MUST be accessible from at least one of the + // requisite topologies." + if len(vol.Topologies) > 0 { + var ok bool + for _, requiredTopo := range vol.Topologies { + if requiredTopo.Equal(plugin.NodeInfo.AccessibleTopology) { + ok = true + break + } + } + if !ok { + return false, FilterConstraintsCSIPluginTopology + } + } + if req.ReadOnly { if !vol.ReadSchedulable() { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoReadTemplate, vol.ID) diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index da590c438a4b..26e2e79072da 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -240,6 +241,8 @@ func TestCSIVolumeChecker(t *testing.T) { mock.Node(), mock.Node(), mock.Node(), + mock.Node(), + mock.Node(), } // Register running plugins on some nodes @@ -254,28 +257,69 @@ func TestCSIVolumeChecker(t *testing.T) { "foo": { PluginID: "foo", Healthy: true, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, }, } nodes[1].CSINodePlugins = map[string]*structs.CSIInfo{ "foo": { PluginID: "foo", Healthy: false, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, }, } nodes[2].CSINodePlugins = map[string]*structs.CSIInfo{ "bar": { PluginID: "bar", Healthy: true, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, }, } nodes[4].CSINodePlugins = map[string]*structs.CSIInfo{ "foo": { PluginID: "foo", Healthy: true, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, + }, + } + nodes[5].CSINodePlugins = map[string]*structs.CSIInfo{ + "foo": { + PluginID: "foo", + Healthy: true, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R4"}, + }, + }, + }, + } + nodes[6].CSINodePlugins = map[string]*structs.CSIInfo{ + "foo": { + PluginID: "foo", + Healthy: true, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + }, }, } @@ -294,6 +338,10 @@ func TestCSIVolumeChecker(t *testing.T) { vol.Namespace = structs.DefaultNamespace vol.AccessMode = structs.CSIVolumeAccessModeMultiNodeMultiWriter vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem + vol.Topologies = []*structs.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + } err := state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(t, err) index++ @@ -361,52 +409,70 @@ func TestCSIVolumeChecker(t *testing.T) { checker.SetNamespace(structs.DefaultNamespace) cases := []struct { + Name string Node *structs.Node RequestedVolumes map[string]*structs.VolumeRequest Result bool }{ - { // Get it + { + Name: "ok", Node: nodes[0], RequestedVolumes: volumes, Result: true, }, - { // Unhealthy + { + Name: "unhealthy node", Node: nodes[1], RequestedVolumes: volumes, Result: false, }, - { // Wrong id + { + Name: "wrong id", Node: nodes[2], RequestedVolumes: volumes, Result: false, }, - { // No Volumes requested or available + { + Name: "no volumes requested or available", Node: nodes[3], RequestedVolumes: noVolumes, Result: true, }, - { // No Volumes requested, some available + { + Name: "no volumes requested, some available", Node: nodes[0], RequestedVolumes: noVolumes, Result: true, }, - { // Volumes requested, none available + { + Name: "volumes requested, none available", Node: nodes[3], RequestedVolumes: volumes, Result: false, }, - { // Volumes requested, MaxVolumes exceeded + { + Name: "volumes requested, max volumes exceeded", Node: nodes[4], RequestedVolumes: volumes, Result: false, }, + { + Name: "no matching topology", + Node: nodes[5], + RequestedVolumes: volumes, + Result: false, + }, + { + Name: "nil topology", + Node: nodes[6], + RequestedVolumes: volumes, + Result: false, + }, } - for i, c := range cases { + for _, c := range cases { checker.SetVolumes(alloc.Name, c.RequestedVolumes) - if act := checker.Feasible(c.Node); act != c.Result { - t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) - } + assert.Equal(t, c.Result, checker.Feasible(c.Node), c.Name) } // add a missing volume diff --git a/website/content/docs/commands/volume/create.mdx b/website/content/docs/commands/volume/create.mdx index dcf276d99679..0ffc78da02a6 100644 --- a/website/content/docs/commands/volume/create.mdx +++ b/website/content/docs/commands/volume/create.mdx @@ -59,6 +59,16 @@ mount_options { mount_flags = ["noatime"] } +topology_request { + required { + topology { segments { "rack" = "R2" } } + topology { segments { "rack" = "R1", "zone" = "us-east-1a"} } + } + preferred { + topology { segments { "rack" = "R1", "zone" = "us-east-1a"} } + } +} + secrets { example_secret = "xyzzy" } @@ -134,6 +144,15 @@ parameters { - `mount_flags` `([]string: )` - The flags passed to `mount` (ex. `["ro", "noatime"]`) +- `topology_request` ([TopologyRequest](#topology_request-parameters): nil) - + Specify locations (region, zone, rack, etc.) where the provisioned + volume must be accessible from. Consult the documentation for your + storage provider and CSI plugin as to whether it supports defining + topology and what values it expects for topology + segments. Specifying topology segments that aren't supported by the + storage provider may return an error or may be silently removed by + the plugin. + - `secrets` (map:nil) - An optional key-value map of strings used as credentials for publishing and unpublishing volumes. @@ -144,6 +163,40 @@ parameters { to each storage provider, so please see the specific plugin documentation for more information. +### `topology_request` Parameters + +For the `topology_request` field, you may specify a list of either +`required` or `preferred` topologies (or both). The `required` +topologies indicate that the volume must be created in a location +accessible from at least one of the listed topologies. The `preferred` +topologies indicate that you would prefer the storage provider to +create the volume in one of the provided topologies. + +Each topology listed has a single field: + +- `segments` `(map[string]string)` - A map of location types to their + values. The specific fields required are defined by the CSI + plugin. For example, a plugin might require defining both a rack and + a zone: `segments {rack = "R2", zone = "us-east-1a"}`. + +For example: + +```hcl +topology_request { + required { + topology { segments { "rack" = "R1", "zone" = "us-east-1a" } } + topology { segments { "rack" = "R2", "zone" = "us-east-1a" } } + } + preferred { + topology { segments { "rack" = "R1", "zone" = "us-east-1a"} } + } +} +``` + +This configuration indicates you require the volume to be created +within racks `R1` or `R2`, but that you prefer the volume to be +created within `R1`. + ### Unused Fields Note that several fields used in the [`volume register`] command are set diff --git a/website/content/docs/commands/volume/register.mdx b/website/content/docs/commands/volume/register.mdx index 1e2023250094..343f8b94269d 100644 --- a/website/content/docs/commands/volume/register.mdx +++ b/website/content/docs/commands/volume/register.mdx @@ -61,6 +61,13 @@ mount_options { mount_flags = ["noatime"] } +topology_request { + required { + topology { segments { "rack" = "R2" } } + topology { segments { "rack" = "R1", "zone" = "us-east-1a"} } + } +} + secrets { example_secret = "xyzzy" } @@ -120,6 +127,15 @@ context { - `fs_type`: file system type (ex. `"ext4"`) - `mount_flags`: the flags passed to `mount` (ex. `"ro,noatime"`) +- `topology_request` ([TopologyRequest](#topology_request-parameters): nil) - + Specify locations (region, zone, rack, etc.) where the provisioned + volume is accessible from. Consult the documentation for your + storage provider and CSI plugin as to whether it supports defining + topology and what values it expects for topology + segments. Specifying topology segments that aren't supported by the + storage provider may return an error or may be silently removed by + the plugin. + - `secrets` (map:nil) - An optional key-value map of strings used as credentials for publishing and unpublishing volumes. @@ -136,6 +152,38 @@ context { each storage provider, so please see the specific plugin documentation for more information. +### `topology_request` Parameters + +For the `topology_request` field, you may specify a list of `required` +topologies. The `required` topologies indicate that the volume was +created in a location accessible from all the listed topologies. + +Note this behavior is different from the `nomad volume create` +command, because the volume has already been created and you are +defining the topology for Nomad. The `register` command does not +support setting `preferred` topologies. + +Each topology listed has a single field: + +- `segments` `(map[string]string)` - A map of location types to their + values. The specific fields required are defined by the CSI + plugin. For example, a plugin might require defining both a rack and + a zone: `segments {rack = "R2", zone = "us-east-1a"}`. + +For example: + +```hcl +topology_request { + required { + topology { segments { "rack" = "R1", "zone" = "us-east-1a" } } + topology { segments { "rack" = "R2", "zone" = "us-east-1a" } } + } +} +``` + +This configuration indicates that the volume is accessible from both +racks `R1` or `R2`. + ### Unused Fields Note that several fields used in the [`volume create`] command are set