Skip to content

Commit

Permalink
CSI: move node unmount to server-driven RPCs (#7596)
Browse files Browse the repository at this point in the history
If a volume-claiming alloc stops and the CSI Node plugin that serves
that alloc's volumes is missing, there's no way for the allocrunner
hook to send the `NodeUnpublish` and `NodeUnstage` RPCs.

This changeset addresses this issue with a redesign of the client-side
for CSI. Rather than unmounting in the alloc runner hook, the alloc
runner hook will simply exit. When the server gets the
`Node.UpdateAlloc` for the terminal allocation that had a volume claim,
it creates a volume claim GC job. This job will made client RPCs to a
new node plugin RPC endpoint, and only once that succeeds, move on to
making the client RPCs to the controller plugin. If the node plugin is
unavailable, the GC job will fail and be requeued.
  • Loading branch information
tgross committed Apr 2, 2020
1 parent 7c1bc8c commit 414caf7
Show file tree
Hide file tree
Showing 19 changed files with 677 additions and 513 deletions.
86 changes: 1 addition & 85 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -45,7 +44,7 @@ func (c *csiHook) Prerun() error {

mounts := make(map[string]*csimanager.MountInfo, len(volumes))
for alias, pair := range volumes {
mounter, err := c.csimanager.MounterForVolume(ctx, pair.volume)
mounter, err := c.csimanager.MounterForPlugin(ctx, pair.volume.PluginID)
if err != nil {
return err
}
Expand All @@ -72,53 +71,6 @@ func (c *csiHook) Prerun() error {
return nil
}

func (c *csiHook) Postrun() error {
if !c.shouldRun() {
return nil
}

// TODO(tgross): the contexts for the CSI RPC calls made during
// mounting can have very long timeouts. Until these are better
// tuned, there's not a good value to put here for a WithCancel
// without risking conflicts with the grpc retries/timeouts in the
// pluginmanager package.
ctx := context.TODO()

volumes, err := c.csiVolumesFromAlloc()
if err != nil {
return err
}

// For Postrun, we accumulate all unmount errors, rather than stopping on the
// first failure. This is because we want to make a best effort to free all
// storage, and in some cases there may be incorrect errors from volumes that
// never mounted correctly during prerun when an alloc is failed. It may also
// fail because a volume was externally deleted while in use by this alloc.
var result *multierror.Error

for _, pair := range volumes {
mounter, err := c.csimanager.MounterForVolume(ctx, pair.volume)
if err != nil {
result = multierror.Append(result, err)
continue
}

usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: string(pair.volume.AttachmentMode),
AccessMode: string(pair.volume.AccessMode),
}

err = mounter.UnmountVolume(ctx, pair.volume, c.alloc, usageOpts)
if err != nil {
result = multierror.Append(result, err)
continue
}
}

return result.ErrorOrNil()
}

type volumeAndRequest struct {
volume *structs.CSIVolume
request *structs.VolumeRequest
Expand Down Expand Up @@ -172,42 +124,6 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
return result, nil
}

// csiVolumesFromAlloc finds all the CSI Volume requests from the allocation's
// task group and then fetches them from the Nomad Server, before returning
// them in the form of map[RequestedAlias]*volumeAndReqest. This allows us to
// thread the request context through to determine usage options for each volume.
//
// If any volume fails to validate then we return an error.
func (c *csiHook) csiVolumesFromAlloc() (map[string]*volumeAndRequest, error) {
vols := make(map[string]*volumeAndRequest)
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for alias, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
vols[alias] = &volumeAndRequest{request: vol}
}
}

for alias, pair := range vols {
req := &structs.CSIVolumeGetRequest{
ID: pair.request.Source,
}
req.Region = c.alloc.Job.Region

var resp structs.CSIVolumeGetResponse
if err := c.rpcClient.RPC("CSIVolume.Get", req, &resp); err != nil {
return nil, err
}

if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
}

vols[alias].volume = resp.Volume
}

return vols, nil
}

func newCSIHook(logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook {
return &csiHook{
alloc: alloc,
Expand Down
63 changes: 51 additions & 12 deletions client/csi_controller_endpoint.go → client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/csi"
)

// CSIController endpoint is used for interacting with CSI plugins on a client.
// CSI endpoint is used for interacting with CSI plugins on a client.
// TODO: Submit metrics with labels to allow debugging per plugin perf problems.
type CSIController struct {
type CSI struct {
c *Client
}

Expand All @@ -29,10 +30,10 @@ var (
ErrPluginTypeError = errors.New("CSI Plugin loaded incorrectly")
)

// ValidateVolume is used during volume registration to validate
// ControllerValidateVolume is used during volume registration to validate
// that a volume exists and that the capabilities it was registered with are
// supported by the CSI Plugin and external volume configuration.
func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateVolumeRequest, resp *structs.ClientCSIControllerValidateVolumeResponse) error {
func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateVolumeRequest, resp *structs.ClientCSIControllerValidateVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "validate_volume"}, time.Now())

if req.VolumeID == "" {
Expand Down Expand Up @@ -65,15 +66,15 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
}

// AttachVolume is used to attach a volume from a CSI Cluster to
// ControllerAttachVolume is used to attach a volume from a CSI Cluster to
// the storage node provided in the request.
//
// The controller attachment flow currently works as follows:
// 1. Validate the volume request
// 2. Call ControllerPublishVolume on the CSI Plugin to trigger a remote attachment
//
// In the future this may be expanded to request dynamic secrets for attachment.
func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error {
func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
Expand Down Expand Up @@ -116,9 +117,9 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum
return nil
}

// DetachVolume is used to detach a volume from a CSI Cluster from
// ControllerDetachVolume is used to detach a volume from a CSI Cluster from
// the storage node provided in the request.
func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error {
func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
Expand Down Expand Up @@ -157,12 +158,50 @@ func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolum
return nil
}

func (c *CSIController) findControllerPlugin(name string) (csi.CSIPlugin, error) {
// NodeDetachVolume is used to detach a volume from a CSI Cluster from
// the storage node provided in the request.
func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, resp *structs.ClientCSINodeDetachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_node", "detach_volume"}, time.Now())

// The following block of validation checks should not be reached on a
// real Nomad cluster. They serve as a defensive check before forwarding
// requests to plugins, and to aid with development.
if req.PluginID == "" {
return errors.New("PluginID is required")
}
if req.VolumeID == "" {
return errors.New("VolumeID is required")
}
if req.AllocID == "" {
return errors.New("AllocID is required")
}

ctx, cancelFn := c.requestContext()
defer cancelFn()

mounter, err := c.c.csimanager.MounterForPlugin(ctx, req.PluginID)
if err != nil {
return err
}

usageOpts := &csimanager.UsageOptions{
ReadOnly: req.ReadOnly,
AttachmentMode: string(req.AttachmentMode),
AccessMode: string(req.AccessMode),
}

err = mounter.UnmountVolume(ctx, req.VolumeID, req.AllocID, usageOpts)
if err != nil {
return err
}
return nil
}

func (c *CSI) findControllerPlugin(name string) (csi.CSIPlugin, error) {
return c.findPlugin(dynamicplugins.PluginTypeCSIController, name)
}

// TODO: Cache Plugin Clients?
func (c *CSIController) findPlugin(ptype, name string) (csi.CSIPlugin, error) {
func (c *CSI) findPlugin(ptype, name string) (csi.CSIPlugin, error) {
pIface, err := c.c.dynamicRegistry.DispensePlugin(ptype, name)
if err != nil {
return nil, err
Expand All @@ -176,6 +215,6 @@ func (c *CSIController) findPlugin(ptype, name string) (csi.CSIPlugin, error) {
return plugin, nil
}

func (c *CSIController) requestContext() (context.Context, context.CancelFunc) {
func (c *CSI) requestContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), CSIPluginRequestTimeout)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ var fakePlugin = &dynamicplugins.PluginInfo{
ConnectionInfo: &dynamicplugins.PluginConnectionInfo{},
}

var fakeNodePlugin = &dynamicplugins.PluginInfo{
Name: "test-plugin",
Type: "csi-node",
ConnectionInfo: &dynamicplugins.PluginConnectionInfo{},
}

func TestCSIController_AttachVolume(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -156,7 +162,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
require.Nil(err)

var resp structs.ClientCSIControllerAttachVolumeResponse
err = client.ClientRPC("CSIController.AttachVolume", tc.Request, &resp)
err = client.ClientRPC("CSI.ControllerAttachVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
Expand Down Expand Up @@ -255,7 +261,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
require.Nil(err)

var resp structs.ClientCSIControllerValidateVolumeResponse
err = client.ClientRPC("CSIController.ValidateVolume", tc.Request, &resp)
err = client.ClientRPC("CSI.ControllerValidateVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
Expand Down Expand Up @@ -338,7 +344,88 @@ func TestCSIController_DetachVolume(t *testing.T) {
require.Nil(err)

var resp structs.ClientCSIControllerDetachVolumeResponse
err = client.ClientRPC("CSIController.DetachVolume", tc.Request, &resp)
err = client.ClientRPC("CSI.ControllerDetachVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}

func TestCSINode_DetachVolume(t *testing.T) {
t.Parallel()

cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSINodeDetachVolumeRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSINodeDetachVolumeResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSINodeDetachVolumeRequest{
PluginID: "some-garbage",
VolumeID: "-",
AllocID: "-",
NodeID: "-",
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
ReadOnly: true,
},
ExpectedErr: errors.New("plugin some-garbage for type csi-node not found"),
},
{
Name: "validates volumeid is not empty",
Request: &structs.ClientCSINodeDetachVolumeRequest{
PluginID: fakeNodePlugin.Name,
},
ExpectedErr: errors.New("VolumeID is required"),
},
{
Name: "validates nodeid is not empty",
Request: &structs.ClientCSINodeDetachVolumeRequest{
PluginID: fakeNodePlugin.Name,
VolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("AllocID is required"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextNodeUnpublishVolumeErr = errors.New("wont-see-this")
},
Request: &structs.ClientCSINodeDetachVolumeRequest{
PluginID: fakeNodePlugin.Name,
VolumeID: "1234-4321-1234-4321",
AllocID: "4321-1234-4321-1234",
},
// we don't have a csimanager in this context
ExpectedErr: errors.New("plugin test-plugin for type csi-node not found"),
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()

fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}

dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(dynamicplugins.PluginTypeCSINode, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakeNodePlugin)
require.Nil(err)

var resp structs.ClientCSINodeDetachVolumeResponse
err = client.ClientRPC("CSI.NodeDetachVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
Expand Down
14 changes: 4 additions & 10 deletions client/pluginmanager/csimanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@ package csimanager

import (
"context"
"errors"
"strings"

"github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/nomad/structs"
)

var (
PluginNotFoundErr = errors.New("Plugin not found")
)

type MountInfo struct {
Source string
IsDevice bool
Expand Down Expand Up @@ -47,17 +42,16 @@ func (u *UsageOptions) ToFS() string {

type VolumeMounter interface {
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error)
UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions) error
UnmountVolume(ctx context.Context, volID, allocID string, usageOpts *UsageOptions) error
}

type Manager interface {
// PluginManager returns a PluginManager for use by the node fingerprinter.
PluginManager() pluginmanager.PluginManager

// MounterForVolume returns a VolumeMounter for the given requested volume.
// If there is no plugin registered for this volume type, a PluginNotFoundErr
// will be returned.
MounterForVolume(ctx context.Context, volume *structs.CSIVolume) (VolumeMounter, error)
// MounterForPlugin returns a VolumeMounter for the plugin ID associated
// with the volume. Returns an error if this plugin isn't registered.
MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error)

// Shutdown shuts down the Manager and unmounts any locally attached volumes.
Shutdown()
Expand Down
Loading

0 comments on commit 414caf7

Please sign in to comment.