Skip to content

Commit

Permalink
Merge pull request kubernetes-csi#16 from jsafrane/fixed-controllerun…
Browse files Browse the repository at this point in the history
…publish-ocp

Bug 1737788: UPSTREAM: 169: Fixed ControllerUnpublish error handling
  • Loading branch information
openshift-merge-robot authored Aug 13, 2019
2 parents 0a9ccf6 + 932db87 commit d102948
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 92 deletions.
29 changes: 19 additions & 10 deletions pkg/attacher/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ type Attacher interface {
// status.
Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, attributes, secrets map[string]string) (metadata map[string]string, detached bool, err error)

// Detach given volume from given node. Note that "detached" is returned on
// error and means that the volume is for sure detached from the node.
// "false" means that the volume may or may not be detached and caller
// should retry.
Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error)
// Detach given volume from given node.
Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) error
}

type attacher struct {
Expand Down Expand Up @@ -79,7 +76,7 @@ func (a *attacher) Attach(ctx context.Context, volumeID string, readOnly bool, n
return rsp.PublishContext, false, nil
}

func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) {
func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) error {
client := csi.NewControllerClient(a.conn)

req := csi.ControllerUnpublishVolumeRequest{
Expand All @@ -88,11 +85,14 @@ func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, s
Secrets: secrets,
}

_, err = client.ControllerUnpublishVolume(ctx, &req)
if err != nil {
return isFinalError(err), err
_, err := client.ControllerUnpublishVolume(ctx, &req)
if err != nil && isNotFound(err) {
// Do not change behavior of NotFound in stable branches.
// See https://github.com/kubernetes-csi/external-attacher/pull/165 and
// https://github.com/container-storage-interface/spec/pull/375
return nil
}
return true, nil
return err
}

func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand Down Expand Up @@ -131,3 +131,12 @@ func isFinalError(err error) bool {
// even start or failed. It is for sure not in progress.
return true
}

func isNotFound(err error) bool {
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error.
return false
}
return st.Code() == codes.NotFound
}
90 changes: 46 additions & 44 deletions pkg/attacher/attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,54 +291,59 @@ func TestDetachAttach(t *testing.T) {
}

tests := []struct {
name string
volumeID string
nodeID string
secrets map[string]string
input *csi.ControllerUnpublishVolumeRequest
output *csi.ControllerUnpublishVolumeResponse
injectError codes.Code
expectError bool
expectDetached bool
name string
volumeID string
nodeID string
secrets map[string]string
input *csi.ControllerUnpublishVolumeRequest
output *csi.ControllerUnpublishVolumeResponse
injectError codes.Code
expectError bool
}{
{
name: "success",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
expectDetached: true,
name: "success",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
},
{
name: "secrets",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
secrets: map[string]string{"foo": "bar"},
input: secretsRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
expectDetached: true,
name: "secrets",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
secrets: map[string]string{"foo": "bar"},
input: secretsRequest,
output: &csi.ControllerUnpublishVolumeResponse{},
expectError: false,
},
{
name: "gRPC final error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.NotFound,
expectError: true,
expectDetached: true,
name: "gRPC final error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.PermissionDenied,
expectError: true,
},
{
name: "gRPC transient error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.DeadlineExceeded,
expectError: true,
expectDetached: false,
name: "gRPC transient error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.DeadlineExceeded,
expectError: true,
},
{
// Explicitly test NotFound, it should be ignored.
name: "gRPC NotFound error",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
input: defaultRequest,
output: nil,
injectError: codes.NotFound,
expectError: false,
},
}

Expand All @@ -365,15 +370,12 @@ func TestDetachAttach(t *testing.T) {
}

a := NewAttacher(csiConn)
detached, err := a.Detach(context.Background(), test.volumeID, test.nodeID, test.secrets)
err := a.Detach(context.Background(), test.volumeID, test.nodeID, test.secrets)
if test.expectError && err == nil {
t.Errorf("test %q: Expected error, got none", test.name)
}
if !test.expectError && err != nil {
t.Errorf("test %q: got error: %v", test.name, err)
}
if detached != test.expectDetached {
t.Errorf("test %q: expected detached=%v, got %v", test.name, test.expectDetached, detached)
}
}
}
12 changes: 4 additions & 8 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"k8s.io/klog"

"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -359,17 +359,13 @@ func (h *csiHandler) csiDetach(va *storage.VolumeAttachment) (*storage.VolumeAtt

ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
detached, err := h.attacher.Detach(ctx, volumeHandle, nodeID, secrets)
if err != nil && !detached {
err = h.attacher.Detach(ctx, volumeHandle, nodeID, secrets)
if err != nil {
// The volume may not be fully detached. Save the error and try again
// after backoff.
return va, err
}
if err != nil {
klog.V(2).Infof("Detached %q with error %s", va.Name, err.Error())
} else {
klog.V(2).Infof("Detached %q", va.Name)
}
klog.V(2).Infof("Detached %q", va.Name)

if va, err := markAsDetached(h.client, va); err != nil {
return va, fmt.Errorf("could not mark as detached: %s", err)
Expand Down
41 changes: 15 additions & 26 deletions pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/kubernetes-csi/external-attacher/pkg/attacher"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -244,10 +244,10 @@ func TestCSIHandler(t *testing.T) {
var noAttrs map[string]string
var noSecrets map[string]string
var notDetached = false
var detached = true
var success error
var readWrite = false
var readOnly = true
var ignored = false // the value is irrelevant for given call

tests := []testCase{
//
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -645,7 +645,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -657,7 +657,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -671,16 +671,16 @@ func TestCSIHandler(t *testing.T) {
expectedCSICalls: []csiCall{},
},
{
name: "CSI detach fails with transient error -> controller retries",
name: "CSI detach fails with an error -> controller retries",
initialObjects: []runtime.Object{pvWithFinalizer(), node()},
addedVA: deleted(va(true, fin, ann)),
expectedActions: []core.Action{
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithDetachError(deleted(va(true /*attached*/, fin, ann)), "mock error")),
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, fmt.Errorf("mock error"), notDetached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, fmt.Errorf("mock error"), ignored, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -691,19 +691,8 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 500 * time.Millisecond},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, time.Duration(0)},
},
},
{
name: "CSI detach fails with final error -> controller does not retry",
initialObjects: []runtime.Object{pvWithFinalizer(), node()},
addedVA: deleted(va(true, fin, ann)),
expectedActions: []core.Action{
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, fmt.Errorf("mock error"), detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 500 * time.Millisecond},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, time.Duration(0)},
},
},
{
Expand Down Expand Up @@ -773,7 +762,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, "annotatedNodeID", noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, "annotatedNodeID", noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -784,7 +773,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, "", map[string]string{vaNodeIDAnnotation: "annotatedNodeID"}))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand Down Expand Up @@ -816,8 +805,8 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false, "", ann))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
{
Expand All @@ -830,7 +819,7 @@ func TestCSIHandler(t *testing.T) {
},
expectedCSICalls: []csiCall{
{"detach", "projects/UNSPECIFIED/zones/testZone/disks/testpd", testNodeID,
map[string]string{"partition": "0"}, noSecrets, readWrite, success, detached, noMetadata, 0},
map[string]string{"partition": "0"}, noSecrets, readWrite, success, ignored, noMetadata, 0},
},
},
//
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ func (f *fakeCSIConnection) Attach(ctx context.Context, volumeID string, readOnl
return call.metadata, call.detached, call.err
}

func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (bool, error) {
func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) error {
if f.index >= len(f.calls) {
f.t.Errorf("Unexpected CSI Detach call: volume=%s, node=%s, index: %d, calls: %+v", volumeID, nodeID, f.index, f.calls)
return true, fmt.Errorf("unexpected call")
return fmt.Errorf("unexpected call")
}
call := f.calls[f.index]
f.index++
Expand Down Expand Up @@ -437,9 +437,9 @@ func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID
}

if err != nil {
return true, err
return err
}
return call.detached, call.err
return call.err
}

func (f *fakeCSIConnection) Close() error {
Expand Down

0 comments on commit d102948

Please sign in to comment.