From 1ba340b003d004f0d18341f8fd169249feafb03a Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 24 Feb 2022 10:44:22 -0500 Subject: [PATCH] CSI: implement support for topology --- api/csi.go | 2 +- client/csi_endpoint.go | 6 ++ client/structs/csi.go | 20 ++-- command/plugin_status_csi.go | 33 ++++++- command/volume_init.go | 18 ++++ command/volume_register_csi.go | 27 ++++++ command/volume_register_test.go | 23 +++++ command/volume_status_csi.go | 13 ++- e2e/csi/input/ebs-volume0.hcl | 7 ++ e2e/csi/input/ebs-volume1.hcl | 7 ++ e2e/csi/input/plugin-aws-ebs-controller.nomad | 2 +- e2e/csi/input/plugin-aws-ebs-nodes.nomad | 2 +- e2e/terraform/variables.tf | 2 +- nomad/csi_endpoint.go | 2 + nomad/csi_endpoint_test.go | 8 ++ nomad/structs/csi.go | 21 +--- nomad/structs/csi_test.go | 13 +++ plugins/csi/client.go | 5 + plugins/csi/client_test.go | 30 +++++- plugins/csi/plugin.go | 9 +- scheduler/feasible.go | 17 ++++ scheduler/feasible_test.go | 96 ++++++++++++++++--- .../content/docs/commands/volume/create.mdx | 23 +++++ .../content/docs/commands/volume/register.mdx | 23 +++++ 24 files changed, 356 insertions(+), 53 deletions(-) diff --git a/api/csi.go b/api/csi.go index 120c239fde8f..f08b7bdb884d 100644 --- a/api/csi.go +++ b/api/csi.go @@ -212,7 +212,7 @@ type CSIVolume struct { Name string ExternalID string `mapstructure:"external_id" hcl:"external_id"` Namespace string - Topologies []*CSITopology + Topologies []*CSITopology `hcl:"required_topology"` AccessMode CSIVolumeAccessMode `hcl:"access_mode"` AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"` MountOptions *CSIMountOptions `hcl:"mount_options"` diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index 029c91ede62a..2438edce0a1b 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -216,6 +216,12 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum resp.CapacityBytes = cresp.Volume.CapacityBytes resp.VolumeContext = cresp.Volume.VolumeContext + resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology)) + for _, topo := range cresp.Volume.AccessibleTopology { + resp.Topologies = append(resp.Topologies, + &nstructs.CSITopology{Segments: topo.Segments}) + } + return nil } diff --git a/client/structs/csi.go b/client/structs/csi.go index fb6ca6c7aeee..7f45f4d8981c 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -220,8 +220,7 @@ type ClientCSIControllerCreateVolumeRequest struct { CapacityMax int64 SnapshotID string CloneID string - // TODO: topology is not yet supported - // TopologyRequirement + Topologies []*structs.CSITopology CSIControllerQuery } @@ -237,8 +236,9 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll CloneID: req.CloneID, SnapshotID: req.SnapshotID, }, - // TODO: topology is not yet supported - AccessibilityRequirements: &csi.TopologyRequirement{}, + AccessibilityRequirements: &csi.TopologyRequirement{ + Requisite: []*csi.Topology{}, + }, } // The CSI spec requires that at least one of the fields in CapacityRange @@ -258,6 +258,14 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll } creq.VolumeCapabilities = append(creq.VolumeCapabilities, ccap) } + + for _, topo := range req.Topologies { + creq.AccessibilityRequirements.Requisite = append( + creq.AccessibilityRequirements.Requisite, &csi.Topology{ + Segments: topo.Segments, + }) + } + return creq, nil } @@ -265,9 +273,7 @@ type ClientCSIControllerCreateVolumeResponse struct { ExternalVolumeID string CapacityBytes int64 VolumeContext map[string]string - - // TODO: topology is not yet supported - // AccessibleTopology []*Topology + Topologies []*structs.CSITopology } // ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a diff --git a/command/plugin_status_csi.go b/command/plugin_status_csi.go index a72de5f206ac..740cc7680763 100644 --- a/command/plugin_status_csi.go +++ b/command/plugin_status_csi.go @@ -118,6 +118,12 @@ func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, erro full = append(full, c.Colorize().Color("\n[bold]Node Capabilities[reset]")) full = append(full, nodeCaps) } + topos := c.formatTopology(plug.Nodes) + if topos != "" { + full = append(full, c.Colorize().Color("\n[bold]Accessible Topologies[reset]")) + full = append(full, topos) + } + } // Format the allocs @@ -177,12 +183,16 @@ func (c *PluginStatusCommand) formatControllerCaps(controllers map[string]*api.C return "" } - return strings.Join(caps, "\n\t") + return " " + strings.Join(caps, "\n ") } func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) string { caps := []string{} for _, node := range nodes { + // TODO: move this up to top-level k/v? + if node.RequiresTopologies { + caps = append(caps, "VOLUME_ACCESSIBILITY_CONSTRAINTS") + } switch info := node.NodeInfo; { case info.RequiresNodeStageVolume: caps = append(caps, "STAGE_UNSTAGE_VOLUME") @@ -207,3 +217,24 @@ func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) stri return " " + strings.Join(caps, "\n ") } + +func (c *PluginStatusCommand) formatTopology(nodes map[string]*api.CSIInfo) string { + topos := []string{} + for nodeID, node := range nodes { + if node.NodeInfo.AccessibleTopology != nil { + segments := node.NodeInfo.AccessibleTopology.Segments + segmentPairs := make([]string, 0, len(segments)) + for k, v := range segments { + segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v)) + } + topos = append(topos, fmt.Sprintf("Node %s: %v", + nodeID[:8], strings.Join(segmentPairs, ","))) + } + } + + if len(topos) == 0 { + return "" + } + + return " " + strings.Join(topos, "\n ") +} diff --git a/command/volume_init.go b/command/volume_init.go index c1714f7bf345..dcd7976ea879 100644 --- a/command/volume_init.go +++ b/command/volume_init.go @@ -150,6 +150,16 @@ mount_options { mount_flags = ["ro"] } +# Optional: specify one or more locations where the volume must be accessible +# from. Refer to the plugin documentation for what segment values are supported. +required_topology { + segments {rack = "R1", zone = "us-east-1a"} +} + +required_topology { + segments {rack = "R2", zone = "us-east-1b"} +} + # Optional: provide any secrets specified by the plugin. secrets { example_secret = "xyzzy" @@ -201,6 +211,14 @@ var defaultJsonVolumeSpec = strings.TrimSpace(` ] } ], + "require_topologies": [ + { + "segments": {"rack": "R1", "zone": "us-east-1a"} + }, + { + "segments": {"rack": "R2", "zone": "us-east-1b"} + } + ] "parameters": [ { "skuname": "Premium_LRS" diff --git a/command/volume_register_csi.go b/command/volume_register_csi.go index aa68d63516e6..99cec97647dc 100644 --- a/command/volume_register_csi.go +++ b/command/volume_register_csi.go @@ -48,6 +48,7 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) { delete(m, "mount_options") delete(m, "capacity_max") delete(m, "capacity_min") + delete(m, "required_topology") delete(m, "type") // Decode the rest @@ -116,6 +117,32 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) { } } + requiredTopos := list.Filter("required_topology") + if len(requiredTopos.Items) > 0 { + + for _, o := range requiredTopos.Elem().Items { + if err := helper.CheckHCLKeys(o.Val, []string{"segments"}); err != nil { + return nil, err + } + + ot, ok := o.Val.(*ast.ObjectType) + if !ok { + break + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, ot.List); err != nil { + return nil, err + } + var topo *api.CSITopology + if err := mapstructure.WeakDecode(&m, &topo); err != nil { + return nil, err + } + + vol.Topologies = append(vol.Topologies, topo) + } + } + return vol, nil } diff --git a/command/volume_register_test.go b/command/volume_register_test.go index b69816bfae6b..a8e89e143901 100644 --- a/command/volume_register_test.go +++ b/command/volume_register_test.go @@ -84,6 +84,14 @@ capability { access_mode = "single-node-reader-only" attachment_mode = "block-device" } + +required_topology { + segments {rack = "R1"} +} + +required_topology { + segments {rack = "R2", zone = "us-east-1a"} +} `, expected: &api.CSIVolume{ ID: "testvolume", @@ -108,6 +116,10 @@ capability { }, Parameters: map[string]string{"skuname": "Premium_LRS"}, Secrets: map[string]string{"password": "xyzzy"}, + Topologies: []*api.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}}, + }, }, err: "", }, { @@ -124,6 +136,14 @@ capability { access_mode = "single-node-writer" attachment_mode = "file-system" } + +# make sure we safely handle empty blocks even +# if they're invalid +require_topology {} + +required_topology { + segments {rack = "R2", zone = "us-east-1a"} +} `, expected: &api.CSIVolume{ ID: "testvolume", @@ -136,6 +156,9 @@ capability { AttachmentMode: api.CSIVolumeAttachmentModeFilesystem, }, }, + Topologies: []*api.CSITopology{ + {Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}}, + }, }, err: "", }, diff --git a/command/volume_status_csi.go b/command/volume_status_csi.go index bce7379cc292..e3a6506cfda5 100644 --- a/command/volume_status_csi.go +++ b/command/volume_status_csi.go @@ -212,10 +212,21 @@ func (c *VolumeStatusCommand) formatBasic(vol *api.CSIVolume) (string, error) { return formatKV(output), nil } + full := []string{formatKV(output)} + + if len(vol.Topologies) > 0 { + topoBanner := c.Colorize().Color("\n[bold]Topologies[reset]") + topo := c.formatTopologies(vol) + full = append(full, topoBanner) + full = append(full, topo) + } + // Format the allocs banner := c.Colorize().Color("\n[bold]Allocations[reset]") allocs := formatAllocListStubs(vol.Allocations, c.verbose, c.length) - full := []string{formatKV(output), banner, allocs} + full = append(full, banner) + full = append(full, allocs) + return strings.Join(full, "\n"), nil } diff --git a/e2e/csi/input/ebs-volume0.hcl b/e2e/csi/input/ebs-volume0.hcl index bf961efedd67..b4a6c30b029c 100644 --- a/e2e/csi/input/ebs-volume0.hcl +++ b/e2e/csi/input/ebs-volume0.hcl @@ -19,3 +19,10 @@ capability { parameters { type = "gp2" } + +required_topology { + segments { + # this zone should match the one set in e2e/terraform/variables.tf + "topology.ebs.csi.aws.com/zone" = "us-east-1b" + } +} diff --git a/e2e/csi/input/ebs-volume1.hcl b/e2e/csi/input/ebs-volume1.hcl index df38b9034727..eaba570a221b 100644 --- a/e2e/csi/input/ebs-volume1.hcl +++ b/e2e/csi/input/ebs-volume1.hcl @@ -19,3 +19,10 @@ capability { parameters { type = "gp2" } + +required_topology { + segments { + # this zone should match the one set in e2e/terraform/variables.tf + "topology.ebs.csi.aws.com/zone" = "us-east-1b" + } +} diff --git a/e2e/csi/input/plugin-aws-ebs-controller.nomad b/e2e/csi/input/plugin-aws-ebs-controller.nomad index b4bd6d626ac3..dd0b675c79d2 100644 --- a/e2e/csi/input/plugin-aws-ebs-controller.nomad +++ b/e2e/csi/input/plugin-aws-ebs-controller.nomad @@ -22,7 +22,7 @@ job "plugin-aws-ebs-controller" { driver = "docker" config { - image = "amazon/aws-ebs-csi-driver:v0.9.0" + image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1" args = [ "controller", diff --git a/e2e/csi/input/plugin-aws-ebs-nodes.nomad b/e2e/csi/input/plugin-aws-ebs-nodes.nomad index 3411990a3168..206b1df81f0c 100644 --- a/e2e/csi/input/plugin-aws-ebs-nodes.nomad +++ b/e2e/csi/input/plugin-aws-ebs-nodes.nomad @@ -19,7 +19,7 @@ job "plugin-aws-ebs-nodes" { driver = "docker" config { - image = "amazon/aws-ebs-csi-driver:v0.9.0" + image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1" args = [ "node", diff --git a/e2e/terraform/variables.tf b/e2e/terraform/variables.tf index c48c6ef67f41..49f8f879057a 100644 --- a/e2e/terraform/variables.tf +++ b/e2e/terraform/variables.tf @@ -10,7 +10,7 @@ variable "region" { variable "availability_zone" { description = "The AWS availability zone to deploy to." - default = "us-east-1a" + default = "us-east-1b" } variable "instance_type" { diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index ac63b8fa5659..c96a0fda1fa9 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -907,6 +907,7 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug CapacityMax: vol.RequestedCapacityMax, SnapshotID: vol.SnapshotID, CloneID: vol.CloneID, + Topologies: vol.Topologies, } cReq.PluginID = plugin.ID cResp := &cstructs.ClientCSIControllerCreateVolumeResponse{} @@ -918,6 +919,7 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug vol.ExternalID = cResp.ExternalVolumeID vol.Capacity = cResp.CapacityBytes vol.Context = cResp.VolumeContext + vol.Topologies = cResp.Topologies return nil } diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 0d06a85910ae..9d4a2f73a5be 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -754,6 +754,9 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) { ExternalVolumeID: "vol-12345", CapacityBytes: 42, VolumeContext: map[string]string{"plugincontext": "bar"}, + Topologies: []*structs.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + }, } client, cleanup := client.TestClientWithRPCs(t, @@ -829,6 +832,10 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }, }, + Topologies: []*structs.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"zone": "Z2"}}, + }, }} // Create the create request @@ -872,6 +879,7 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) { require.Equal(t, int64(42), vol.Capacity) require.Equal(t, "bar", vol.Context["plugincontext"]) require.Equal(t, "", vol.Context["mycontext"]) + require.Equal(t, map[string]string{"rack": "R1"}, vol.Topologies[0].Segments) } func TestCSIVolumeEndpoint_Delete(t *testing.T) { diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 9540d8445b26..210437bf24e3 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -679,19 +679,11 @@ func (v *CSIVolume) Validate() error { if len(v.RequestedCapabilities) == 0 { errs = append(errs, "must include at least one capability block") } - - // TODO: Volume Topologies are optional - We should check to see if the plugin - // the volume is being registered with requires them. - // var ok bool - // for _, t := range v.Topologies { - // if t != nil && len(t.Segments) > 0 { - // ok = true - // break - // } - // } - // if !ok { - // errs = append(errs, "missing topology") - // } + for _, t := range v.Topologies { + if t != nil && len(t.Segments) == 0 { + errs = append(errs, "topology is missing segments field") + } + } if len(errs) > 0 { return fmt.Errorf("validation: %s", strings.Join(errs, ", ")) @@ -836,9 +828,6 @@ type CSIVolumeExternalStub struct { CloneID string SnapshotID string - // TODO: topology support - // AccessibleTopology []*Topology - PublishedExternalNodeIDs []string IsAbnormal bool Status string diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 32d150816a19..633cb2671d4a 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -554,6 +554,19 @@ func TestVolume_Copy(t *testing.T) { } +func TestCSIVolume_Validate(t *testing.T) { + vol := &CSIVolume{ + ID: "test", + PluginID: "test", + SnapshotID: "test-snapshot", + CloneID: "test-clone", + Topologies: []*CSITopology{{}, {}}, + } + err := vol.Validate() + require.EqualError(t, err, "validation: missing namespace, only one of snapshot_id and clone_id is allowed, must include at least one capability block, topology is missing segments field, topology is missing segments field") + +} + func TestCSIPluginJobs(t *testing.T) { plug := NewCSIPlugin("foo", 1000) controller := &Job{ diff --git a/plugins/csi/client.go b/plugins/csi/client.go index 6d34d8f55aa2..8c5185bbefff 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -740,6 +740,11 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) result.MaxVolumes = math.MaxInt64 } + topo := resp.GetAccessibleTopology() + if topo != nil { + result.AccessibleTopology = &Topology{Segments: topo.Segments} + } + return result, nil } diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 1c951b2bac96..6554662c77d7 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -746,7 +746,7 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { }, { - Name: "handles success with capacity range and source", + Name: "handles success with capacity range, source, and topology", CapacityRange: &CapacityRange{ RequiredBytes: 500, LimitBytes: 1000, @@ -764,6 +764,9 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { }, }, }, + AccessibleTopology: []*csipbv1.Topology{ + {Segments: map[string]string{"rack": "R1"}}, + }, }, }, }, @@ -782,10 +785,19 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { AccessMode: VolumeAccessModeMultiNodeMultiWriter, }, }, - Parameters: map[string]string{}, - Secrets: structs.CSISecrets{}, - ContentSource: tc.ContentSource, - AccessibilityRequirements: &TopologyRequirement{}, + Parameters: map[string]string{}, + Secrets: structs.CSISecrets{}, + ContentSource: tc.ContentSource, + AccessibilityRequirements: &TopologyRequirement{ + Requisite: []*Topology{ + { + Segments: map[string]string{"rack": "R1"}, + }, + { + Segments: map[string]string{"rack": "R2"}, + }, + }, + }, } cc.NextCreateVolumeResponse = tc.Response @@ -808,6 +820,14 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { require.Equal(t, tc.ContentSource.CloneID, resp.Volume.ContentSource.CloneID) require.Equal(t, tc.ContentSource.SnapshotID, resp.Volume.ContentSource.SnapshotID) } + if tc.Response != nil && tc.Response.Volume != nil { + require.Len(t, resp.Volume.AccessibleTopology, 1) + require.Equal(t, + req.AccessibilityRequirements.Requisite[0].Segments, + resp.Volume.AccessibleTopology[0].Segments, + ) + } + }) } } diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index 54c664b8fe63..c03390688b69 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -581,10 +581,11 @@ type ControllerCreateVolumeResponse struct { func NewCreateVolumeResponse(resp *csipbv1.CreateVolumeResponse) *ControllerCreateVolumeResponse { vol := resp.GetVolume() return &ControllerCreateVolumeResponse{Volume: &Volume{ - CapacityBytes: vol.GetCapacityBytes(), - ExternalVolumeID: vol.GetVolumeId(), - VolumeContext: vol.GetVolumeContext(), - ContentSource: newVolumeContentSource(vol.GetContentSource()), + CapacityBytes: vol.GetCapacityBytes(), + ExternalVolumeID: vol.GetVolumeId(), + VolumeContext: vol.GetVolumeContext(), + ContentSource: newVolumeContentSource(vol.GetContentSource()), + AccessibleTopology: newTopologies(vol.GetAccessibleTopology()), }} } diff --git a/scheduler/feasible.go b/scheduler/feasible.go index fecd97286034..3c478eb695d6 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -27,6 +27,7 @@ const ( FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released" FilterConstraintDrivers = "missing drivers" FilterConstraintDevices = "missing devices" + FilterConstraintsCSIPluginTopology = "did not meet topology requirement" ) var ( @@ -313,6 +314,22 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) { return false, fmt.Sprintf(FilterConstraintCSIPluginMaxVolumesTemplate, vol.PluginID, n.ID) } + // CSI spec: "If requisite is specified, the provisioned + // volume MUST be accessible from at least one of the + // requisite topologies." + if len(vol.Topologies) > 0 { + var ok bool + for _, requiredTopo := range vol.Topologies { + if requiredTopo.Equal(plugin.NodeInfo.AccessibleTopology) { + ok = true + break + } + } + if !ok { + return false, FilterConstraintsCSIPluginTopology + } + } + if req.ReadOnly { if !vol.ReadSchedulable() { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoReadTemplate, vol.ID) diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index da590c438a4b..26e2e79072da 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -240,6 +241,8 @@ func TestCSIVolumeChecker(t *testing.T) { mock.Node(), mock.Node(), mock.Node(), + mock.Node(), + mock.Node(), } // Register running plugins on some nodes @@ -254,28 +257,69 @@ func TestCSIVolumeChecker(t *testing.T) { "foo": { PluginID: "foo", Healthy: true, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, }, } nodes[1].CSINodePlugins = map[string]*structs.CSIInfo{ "foo": { PluginID: "foo", Healthy: false, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, }, } nodes[2].CSINodePlugins = map[string]*structs.CSIInfo{ "bar": { PluginID: "bar", Healthy: true, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, }, } nodes[4].CSINodePlugins = map[string]*structs.CSIInfo{ "foo": { PluginID: "foo", Healthy: true, - NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1}, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R1"}, + }, + }, + }, + } + nodes[5].CSINodePlugins = map[string]*structs.CSIInfo{ + "foo": { + PluginID: "foo", + Healthy: true, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + AccessibleTopology: &structs.CSITopology{ + Segments: map[string]string{"rack": "R4"}, + }, + }, + }, + } + nodes[6].CSINodePlugins = map[string]*structs.CSIInfo{ + "foo": { + PluginID: "foo", + Healthy: true, + NodeInfo: &structs.CSINodeInfo{ + MaxVolumes: 1, + }, }, } @@ -294,6 +338,10 @@ func TestCSIVolumeChecker(t *testing.T) { vol.Namespace = structs.DefaultNamespace vol.AccessMode = structs.CSIVolumeAccessModeMultiNodeMultiWriter vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem + vol.Topologies = []*structs.CSITopology{ + {Segments: map[string]string{"rack": "R1"}}, + {Segments: map[string]string{"rack": "R2"}}, + } err := state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(t, err) index++ @@ -361,52 +409,70 @@ func TestCSIVolumeChecker(t *testing.T) { checker.SetNamespace(structs.DefaultNamespace) cases := []struct { + Name string Node *structs.Node RequestedVolumes map[string]*structs.VolumeRequest Result bool }{ - { // Get it + { + Name: "ok", Node: nodes[0], RequestedVolumes: volumes, Result: true, }, - { // Unhealthy + { + Name: "unhealthy node", Node: nodes[1], RequestedVolumes: volumes, Result: false, }, - { // Wrong id + { + Name: "wrong id", Node: nodes[2], RequestedVolumes: volumes, Result: false, }, - { // No Volumes requested or available + { + Name: "no volumes requested or available", Node: nodes[3], RequestedVolumes: noVolumes, Result: true, }, - { // No Volumes requested, some available + { + Name: "no volumes requested, some available", Node: nodes[0], RequestedVolumes: noVolumes, Result: true, }, - { // Volumes requested, none available + { + Name: "volumes requested, none available", Node: nodes[3], RequestedVolumes: volumes, Result: false, }, - { // Volumes requested, MaxVolumes exceeded + { + Name: "volumes requested, max volumes exceeded", Node: nodes[4], RequestedVolumes: volumes, Result: false, }, + { + Name: "no matching topology", + Node: nodes[5], + RequestedVolumes: volumes, + Result: false, + }, + { + Name: "nil topology", + Node: nodes[6], + RequestedVolumes: volumes, + Result: false, + }, } - for i, c := range cases { + for _, c := range cases { checker.SetVolumes(alloc.Name, c.RequestedVolumes) - if act := checker.Feasible(c.Node); act != c.Result { - t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) - } + assert.Equal(t, c.Result, checker.Feasible(c.Node), c.Name) } // add a missing volume diff --git a/website/content/docs/commands/volume/create.mdx b/website/content/docs/commands/volume/create.mdx index dcf276d99679..cf68ac0a22f4 100644 --- a/website/content/docs/commands/volume/create.mdx +++ b/website/content/docs/commands/volume/create.mdx @@ -59,6 +59,14 @@ mount_options { mount_flags = ["noatime"] } +required_topology { + segments { "rack" = "R1", "zone" = "us-east-1a"} +} + +required_topology { + segments { "rack" = "R2" } +} + secrets { example_secret = "xyzzy" } @@ -134,6 +142,21 @@ parameters { - `mount_flags` `([]string: )` - The flags passed to `mount` (ex. `["ro", "noatime"]`) +- `required_topology` - Specify a location (region, zone, rack, etc.) + where the provisioned volume must be accessible from. Consult the + documentation for your storage provider and CSI plugin as to whether + it supports defining topology and what values it expects for + topology segments. Specifying topology segments that aren't + supported by the storage provider may return an error or may be + silently removed by the plugin. You may provide multiple + `required_topology` blocks, and the scheduler will ensure at least + one matches the node where a volume is placed. + + - `segments` `(map[string]string)` - A map of location types to + their values. The specific fields required are defined by the CSI + plugin. For example, a plugin might require defining both a rack + and a zone: `segments {rack = "R2", zone = "us-east-1a"}`. + - `secrets` (map:nil) - An optional key-value map of strings used as credentials for publishing and unpublishing volumes. diff --git a/website/content/docs/commands/volume/register.mdx b/website/content/docs/commands/volume/register.mdx index 1e2023250094..ff271dc86b6a 100644 --- a/website/content/docs/commands/volume/register.mdx +++ b/website/content/docs/commands/volume/register.mdx @@ -61,6 +61,14 @@ mount_options { mount_flags = ["noatime"] } +required_topology { + segments { "rack" = "R1", "zone" = "us-east-1a"} +} + +required_topology { + segments { "rack" = "R2" } +} + secrets { example_secret = "xyzzy" } @@ -120,6 +128,21 @@ context { - `fs_type`: file system type (ex. `"ext4"`) - `mount_flags`: the flags passed to `mount` (ex. `"ro,noatime"`) +- `required_topology` - Specify a location (region, zone, rack, etc.) + where the provisioned volume must be accessible from. Consult the + documentation for your storage provider and CSI plugin as to whether + it supports defining topology and what values it expects for + topology segments. Specifying topology segments that aren't + supported by the storage provider may return an error or may be + silently removed by the plugin. You may provide multiple + `required_topology` blocks, and the scheduler will ensure at least + one matches the node where a volume is placed. + + - `segments` `(map[string]string)` - A map of location types to + their values. The specific fields required are defined by the CSI + plugin. For example, a plugin might require defining both a rack + and a zone: `segments {rack = "R2", zone = "us-east-1a"}`. + - `secrets` (map:nil) - An optional key-value map of strings used as credentials for publishing and unpublishing volumes.