Skip to content

Commit

Permalink
csi: client RPCs should return wrapped errors for checking
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Aug 7, 2020
1 parent 862691d commit 3366aa9
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 31 deletions.
26 changes: 21 additions & 5 deletions client/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package client
import (
"context"
"errors"
"fmt"
"time"

metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
)

Expand Down Expand Up @@ -46,7 +48,9 @@ func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateV

plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()

Expand Down Expand Up @@ -78,7 +82,9 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()

Expand Down Expand Up @@ -123,7 +129,9 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()

Expand Down Expand Up @@ -152,9 +160,14 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the controller detach previously happened but the server failed to
// checkpoint, we'll get an error from the plugin but can safely ignore it.
c.c.logger.Debug("could not unpublish volume: %v", err)
return nil
}
return err
}

return nil
}

Expand Down Expand Up @@ -191,7 +204,10 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
}

err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
if err != nil {
if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the unmounting previously happened but the server failed to
// checkpoint, we'll get an error from Unmount but can safely
// ignore it.
return err
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions client/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
},
VolumeID: "foo",
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates attachmentmode",
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestCSIController_DetachVolume(t *testing.T) {
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
Expand Down
12 changes: 6 additions & 6 deletions client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package csimanager

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -317,7 +318,7 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al
// host target path was already destroyed, nothing to do here.
// this helps us in the case that a previous GC attempt cleaned
// up the volume on the node but the controller RPCs failed
return nil
rpcErr = fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}
return rpcErr
}
Expand All @@ -332,9 +333,8 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al

// We successfully removed the directory, return any rpcErrors that were
// encountered, but because we got here, they were probably flaky or was
// cleaned up externally. We might want to just return `nil` here in the
// future.
return rpcErr
// cleaned up externally.
return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}

func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
Expand All @@ -343,7 +343,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo

err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage)

if err == nil {
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
canRelease := v.usageTracker.Free(allocID, volID, usage)
if v.requiresStaging && canRelease {
err = v.unstageVolume(ctx, volID, remoteID, usage)
Expand All @@ -354,7 +354,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Unmount volume").
AddDetail("volume_id", volID)
if err == nil {
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
event.AddDetail("success", "true")
} else {
event.AddDetail("success", "false")
Expand Down
11 changes: 4 additions & 7 deletions nomad/client_csi_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package nomad

import (
"errors"
"fmt"
"math/rand"
"strings"
"time"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

// ClientCSI is used to forward RPC requests to the targed Nomad client's
Expand Down Expand Up @@ -110,12 +111,8 @@ func (a *ClientCSI) ControllerDetachVolume(args *cstructs.ClientCSIControllerDet
// client has stopped and been GC'd, or where the controller has stopped but
// we don't have the fingerprint update yet
func (a *ClientCSI) isRetryable(err error, clientID, pluginID string) bool {
// TODO(tgross): it would be nicer to use errors.Is here but we
// need to make sure we're using error wrapping to make that work
errMsg := err.Error()
return strings.Contains(errMsg, fmt.Sprintf("Unknown node: %s", clientID)) ||
strings.Contains(errMsg, "no plugins registered for type: csi-controller") ||
strings.Contains(errMsg, fmt.Sprintf("plugin %s for type controller not found", pluginID))
return errors.Is(err, structs.ErrUnknownNode) ||
errors.Is(err, structs.ErrCSIClientRPCRetryable)
}

func (a *ClientCSI) NodeDetachVolume(args *cstructs.ClientCSINodeDetachVolumeRequest, reply *cstructs.ClientCSINodeDetachVolumeResponse) error {
Expand Down
6 changes: 1 addition & 5 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,6 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
// is garbage-collected, so at this point we don't have any reason
// to operate as though the volume is attached to it.
if !errors.Is(err, fmt.Errorf("Unknown node: %s", claim.NodeID)) {
// TODO(tgross): need to capture case where NodeUnpublish previously
// happened but we failed to checkpoint for some reason
return fmt.Errorf("could not detach from node: %w", err)
}
}
Expand Down Expand Up @@ -662,8 +660,6 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
err = v.srv.RPC("ClientCSI.ControllerDetachVolume", req,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
// TODO(tgross): need to capture case where ControllerUnpublish previously
// happened but we failed to checkpoint for some reason
return fmt.Errorf("could not detach from controller: %v", err)
}
claim.State = structs.CSIVolumeClaimStateReadyToFree
Expand Down Expand Up @@ -704,7 +700,7 @@ func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs.
// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
targetCSIInfo, ok := targetNode.CSINodePlugins[vol.PluginID]
if !ok {
if !ok || targetCSIInfo.NodeInfo == nil {
return "", fmt.Errorf("failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client", targetNode.ID, vol.PluginID)
}
return targetCSIInfo.NodeInfo.ID, nil
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var (
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
ErrMissingAllocID = errors.New(errMissingAllocID)

ErrUnknownNode = errors.New(ErrUnknownNodePrefix)

ErrDeploymentTerminalNoCancel = errors.New(errDeploymentTerminalNoCancel)
ErrDeploymentTerminalNoFail = errors.New(errDeploymentTerminalNoFail)
ErrDeploymentTerminalNoPause = errors.New(errDeploymentTerminalNoPause)
Expand All @@ -61,6 +63,9 @@ var (
ErrDeploymentTerminalNoRun = errors.New(errDeploymentTerminalNoRun)
ErrDeploymentTerminalNoSetHealth = errors.New(errDeploymentTerminalNoSetHealth)
ErrDeploymentRunningNoUnblock = errors.New(errDeploymentRunningNoUnblock)

ErrCSIClientRPCIgnorable = errors.New("CSI client error (ignorable)")
ErrCSIClientRPCRetryable = errors.New("CSI client error (retryable)")
)

// IsErrNoLeader returns whether the error is due to there being no leader.
Expand Down
2 changes: 1 addition & 1 deletion nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func getNodeForRpc(snap *state.StateSnapshot, nodeID string) (*structs.Node, err
}

if node == nil {
return nil, fmt.Errorf("Unknown node %q", nodeID)
return nil, fmt.Errorf("%w %s", structs.ErrUnknownNode, nodeID)
}

if err := nodeSupportsRpc(node); err != nil {
Expand Down
15 changes: 11 additions & 4 deletions plugins/csi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"google.golang.org/grpc"
Expand Down Expand Up @@ -314,8 +315,12 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q or node %q could not be found: %v",
req.ExternalID, req.NodeID, err)
// we'll have validated the volume and node *should* exist at the
// server, so if we get a not-found here it's because we've previously
// checkpointed. we'll return an error so the caller can log it for
// diagnostic purposes.
err = fmt.Errorf("%w: volume %q or node %q could not be found: %v",
structs.ErrCSIClientRPCIgnorable, req.ExternalID, req.NodeID, err)
case codes.Internal:
err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down Expand Up @@ -558,7 +563,8 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
err = fmt.Errorf("%w: volume %q could not be found: %v",
structs.ErrCSIClientRPCIgnorable, volumeID, err)
case codes.Internal:
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down Expand Up @@ -630,7 +636,8 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s
code := status.Code(err)
switch code {
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
err = fmt.Errorf("%w: volume %q could not be found: %v",
structs.ErrCSIClientRPCIgnorable, volumeID, err)
case codes.Internal:
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
Expand Down

0 comments on commit 3366aa9

Please sign in to comment.