Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI unpublish error handling improvements #8605

Merged
merged 1 commit into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package client
import (
"context"
"errors"
"fmt"
"time"

metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
)

Expand Down Expand Up @@ -46,7 +48,9 @@ func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateV

plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()

Expand Down Expand Up @@ -78,7 +82,9 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()

Expand Down Expand Up @@ -123,7 +129,9 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()

Expand Down Expand Up @@ -152,9 +160,14 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the controller detach previously happened but the server failed to
// checkpoint, we'll get an error from the plugin but can safely ignore it.
c.c.logger.Debug("could not unpublish volume: %v", err)
return nil
}
return err
}

return nil
}

Expand Down Expand Up @@ -191,7 +204,10 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
}

err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
if err != nil {
if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the unmounting previously happened but the server failed to
// checkpoint, we'll get an error from Unmount but can safely
// ignore it.
return err
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions client/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
},
VolumeID: "foo",
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates attachmentmode",
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestCSIController_DetachVolume(t *testing.T) {
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
Expand Down
12 changes: 6 additions & 6 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package csimanager

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -317,7 +318,7 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al
// host target path was already destroyed, nothing to do here.
// this helps us in the case that a previous GC attempt cleaned
// up the volume on the node but the controller RPCs failed
return nil
rpcErr = fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}
return rpcErr
}
Expand All @@ -332,9 +333,8 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al

// We successfully removed the directory, return any rpcErrors that were
// encountered, but because we got here, they were probably flaky or was
// cleaned up externally. We might want to just return `nil` here in the
// future.
return rpcErr
// cleaned up externally.
return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}

func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
Expand All @@ -343,7 +343,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo

err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage)

if err == nil {
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
canRelease := v.usageTracker.Free(allocID, volID, usage)
if v.requiresStaging && canRelease {
err = v.unstageVolume(ctx, volID, remoteID, usage)
Expand All @@ -354,7 +354,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Unmount volume").
AddDetail("volume_id", volID)
if err == nil {
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
event.AddDetail("success", "true")
} else {
event.AddDetail("success", "false")
Expand Down
11 changes: 4 additions & 7 deletions nomad/client_csi_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package nomad

import (
"errors"
"fmt"
"math/rand"
"strings"
"time"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

// ClientCSI is used to forward RPC requests to the targed Nomad client's
Expand Down Expand Up @@ -110,12 +111,8 @@ func (a *ClientCSI) ControllerDetachVolume(args *cstructs.ClientCSIControllerDet
// client has stopped and been GC'd, or where the controller has stopped but
// we don't have the fingerprint update yet
func (a *ClientCSI) isRetryable(err error, clientID, pluginID string) bool {
// TODO(tgross): it would be nicer to use errors.Is here but we
// need to make sure we're using error wrapping to make that work
errMsg := err.Error()
return strings.Contains(errMsg, fmt.Sprintf("Unknown node: %s", clientID)) ||
strings.Contains(errMsg, "no plugins registered for type: csi-controller") ||
strings.Contains(errMsg, fmt.Sprintf("plugin %s for type controller not found", pluginID))
return errors.Is(err, structs.ErrUnknownNode) ||
errors.Is(err, structs.ErrCSIClientRPCRetryable)
}

func (a *ClientCSI) NodeDetachVolume(args *cstructs.ClientCSINodeDetachVolumeRequest, reply *cstructs.ClientCSINodeDetachVolumeResponse) error {
Expand Down
8 changes: 2 additions & 6 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,9 +620,7 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
// we should only get this error if the Nomad node disconnects and
// is garbage-collected, so at this point we don't have any reason
// to operate as though the volume is attached to it.
if !errors.Is(err, fmt.Errorf("Unknown node: %s", claim.NodeID)) {
// TODO(tgross): need to capture case where NodeUnpublish previously
// happened but we failed to checkpoint for some reason
if !errors.Is(err, structs.ErrUnknownNode) {
return fmt.Errorf("could not detach from node: %w", err)
}
}
Expand Down Expand Up @@ -658,8 +656,6 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
err := v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
// TODO(tgross): need to capture case where ControllerUnpublish previously
// happened but we failed to checkpoint for some reason
return fmt.Errorf("could not detach from controller: %v", err)
}
claim.State = structs.CSIVolumeClaimStateReadyToFree
Expand Down Expand Up @@ -700,7 +696,7 @@ func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.
// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
targetCSIInfo, ok := targetNode.CSINodePlugins[vol.PluginID]
if !ok {
if !ok || targetCSIInfo.NodeInfo == nil {
return "", fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, vol.PluginID)
}
return targetCSIInfo.NodeInfo.ID, nil
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var (
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
ErrMissingAllocID = errors.New(errMissingAllocID)

ErrUnknownNode = errors.New(ErrUnknownNodePrefix)

ErrDeploymentTerminalNoCancel = errors.New(errDeploymentTerminalNoCancel)
ErrDeploymentTerminalNoFail = errors.New(errDeploymentTerminalNoFail)
ErrDeploymentTerminalNoPause = errors.New(errDeploymentTerminalNoPause)
Expand All @@ -61,6 +63,9 @@ var (
ErrDeploymentTerminalNoRun = errors.New(errDeploymentTerminalNoRun)
ErrDeploymentTerminalNoSetHealth = errors.New(errDeploymentTerminalNoSetHealth)
ErrDeploymentRunningNoUnblock = errors.New(errDeploymentRunningNoUnblock)

ErrCSIClientRPCIgnorable = errors.New("CSI client error (ignorable)")
ErrCSIClientRPCRetryable = errors.New("CSI client error (retryable)")
)

// IsErrNoLeader returns whether the error is due to there being no leader.
Expand Down
2 changes: 1 addition & 1 deletion nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func getNodeForRpc(snap *state.StateSnapshot, nodeID string) (*structs.Node, err
}

if node == nil {
return nil, fmt.Errorf("Unknown node %q", nodeID)
return nil, fmt.Errorf("%w %s", structs.ErrUnknownNode, nodeID)
}

if err := nodeSupportsRpc(node); err != nil {
Expand Down
15 changes: 11 additions & 4 deletions plugins/csi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"google.golang.org/grpc"
Expand Down Expand Up @@ -314,8 +315,12 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q or node %q could not be found: %v",
req.ExternalID, req.NodeID, err)
// we'll have validated the volume and node *should* exist at the
// server, so if we get a not-found here it's because we've previously
// checkpointed. we'll return an error so the caller can log it for
// diagnostic purposes.
err = fmt.Errorf("%w: volume %q or node %q could not be found: %v",
structs.ErrCSIClientRPCIgnorable, req.ExternalID, req.NodeID, err)
case codes.Internal:
err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down Expand Up @@ -558,7 +563,8 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
err = fmt.Errorf("%w: volume %q could not be found: %v",
structs.ErrCSIClientRPCIgnorable, volumeID, err)
case codes.Internal:
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down Expand Up @@ -630,7 +636,8 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
err = fmt.Errorf("%w: volume %q could not be found: %v",
structs.ErrCSIClientRPCIgnorable, volumeID, err)
case codes.Internal:
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down