diff --git a/Gopkg.lock b/Gopkg.lock index dc1c02953d..10956d2adc 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -836,6 +836,8 @@ "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake", "github.com/spf13/pflag", "google.golang.org/grpc", + "google.golang.org/grpc/codes", + "google.golang.org/grpc/status", "k8s.io/api/core/v1", "k8s.io/api/storage/v1beta1", "k8s.io/apimachinery/pkg/api/resource", diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b97159dcaf..f3760a5d91 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -45,6 +45,8 @@ import ( "k8s.io/klog" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type deprecatedSecretParamsMap struct { @@ -163,6 +165,7 @@ type csiProvisioner struct { var _ controller.Provisioner = &csiProvisioner{} var _ controller.BlockProvisioner = &csiProvisioner{} +var _ controller.ProvisionerExt = &csiProvisioner{} var ( // Each provisioner have a identify string to distinguish with others. This @@ -328,6 +331,12 @@ func getVolumeCapability( } func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) { + // The controller should call ProvisionExt() instead, but just in case... + pv, _, err := p.ProvisionExt(options) + return pv, err +} + +func (p *csiProvisioner) ProvisionExt(options controller.VolumeOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) { createVolumeRequestParameters := options.Parameters migratedVolume := false if p.supportsMigrationFromInTreePluginName != "" { @@ -339,13 +348,13 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis // so that external provisioner can correctly pick up the PVC pointing to an in-tree plugin storageClass, err := p.client.StorageV1().StorageClasses().Get(*storageClassName, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("failed to get storage class named %s: %v", *storageClassName, err) + return nil, controller.ProvisioningFinished, fmt.Errorf("failed to get storage class named %s: %v", *storageClassName, err) } if storageClass.Provisioner == p.supportsMigrationFromInTreePluginName { klog.V(2).Infof("translating storage class parameters for in-tree plugin %s to CSI", storageClass.Provisioner) createVolumeRequestParameters, err = csitranslationlib.TranslateInTreeStorageClassParametersToCSI(p.supportsMigrationFromInTreePluginName, options.Parameters) if err != nil { - return nil, fmt.Errorf("failed to translate storage class parameters: %v", err) + return nil, controller.ProvisioningFinished, fmt.Errorf("failed to translate storage class parameters: %v", err) } migratedVolume = true } else { @@ -357,28 +366,28 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis if options.PVC.Spec.DataSource != nil { // PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object if options.PVC.Spec.DataSource.Name == "" { - return nil, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name) + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name) } if options.PVC.Spec.DataSource.Kind != snapshotKind { - return nil, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind) + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source is not the right type. Expected %s, Got %s", snapshotKind, options.PVC.Spec.DataSource.Kind) } if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup { - return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup)) + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup)) } needSnapshotSupport = true } if err := p.checkDriverCapabilities(needSnapshotSupport); err != nil { - return nil, err + return nil, controller.ProvisioningFinished, err } if options.PVC.Spec.Selector != nil { - return nil, fmt.Errorf("claim Selector is not supported") + return nil, controller.ProvisioningFinished, fmt.Errorf("claim Selector is not supported") } pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength) if err != nil { - return nil, err + return nil, controller.ProvisioningFinished, err } fsTypesFound := 0 @@ -393,7 +402,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis } } if fsTypesFound > 1 { - return nil, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey) + return nil, controller.ProvisioningFinished, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey) } if len(fsType) == 0 { fsType = defaultFSType @@ -421,7 +430,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis if needSnapshotSupport { volumeContentSource, err := p.getVolumeContentSource(options) if err != nil { - return nil, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err) + return nil, controller.ProvisioningNoChange, fmt.Errorf("error getting snapshot handle for snapshot %s: %v", options.PVC.Spec.DataSource.Name, err) } req.VolumeContentSource = volumeContentSource } @@ -434,7 +443,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis options.AllowedTopologies, options.SelectedNode) if err != nil { - return nil, fmt.Errorf("error generating accessibility requirements: %v", err) + return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err) } req.AccessibilityRequirements = requirements } @@ -447,31 +456,31 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis // No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time. provisionerSecretRef, err := getSecretReference(provisionerSecretParams, createVolumeRequestParameters, pvName, nil) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } req.Secrets = provisionerCredentials // Resolve controller publish, node stage, node publish secret references controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, createVolumeRequestParameters, pvName, options.PVC) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, createVolumeRequestParameters, pvName, options.PVC) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, createVolumeRequestParameters, pvName, options.PVC) if err != nil { - return nil, err + return nil, controller.ProvisioningNoChange, err } req.Parameters, err = removePrefixedParameters(createVolumeRequestParameters) if err != nil { - return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err) + return nil, controller.ProvisioningFinished, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), p.timeout) @@ -479,7 +488,10 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis rep, err = p.csiClient.CreateVolume(ctx, &req) if err != nil { - return nil, err + if isFinalError(err) { + return nil, controller.ProvisioningFinished, err + } + return nil, controller.ProvisioningInBackground, err } if rep.Volume != nil { @@ -502,7 +514,8 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis if err != nil { capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err) } - return nil, capErr + // use InBackground to retry the call, hoping the volume is deleted correctly next time. + return nil, controller.ProvisioningInBackground, capErr } pv := &v1.PersistentVolume{ @@ -547,14 +560,19 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis pv, err = csitranslationlib.TranslateCSIPVToInTree(pv) if err != nil { klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err) - p.Delete(pv) - return nil, err + deleteErr := p.Delete(pv) + if deleteErr != nil { + klog.Warningf("failed to delete partly provisioned PV: %v", deleteErr) + // Retry the call again to clean up the orphan + return nil, controller.ProvisioningInBackground, err + } + return nil, controller.ProvisioningFinished, err } } klog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource) - return pv, nil + return pv, controller.ProvisioningFinished, nil } func (p *csiProvisioner) supportsTopology() bool { @@ -903,3 +921,27 @@ func deprecationWarning(deprecatedParam, newParam, removalVersion string) string } return fmt.Sprintf("\"%s\" is deprecated and will be removed in %s%s", deprecatedParam, removalVersion, newParamPhrase) } + +func isFinalError(err error) bool { + // Sources: + // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md + // https://github.com/container-storage-interface/spec/blob/master/spec.md + st, ok := status.FromError(err) + if !ok { + // This is not gRPC error. The operation must have failed before gRPC + // method was called, otherwise we would get gRPC error. + // We don't know if any previous CreateVolume is in progress, be on the safe side. + return false + } + switch st.Code() { + case codes.Canceled, // gRPC: Client Application cancelled the request + codes.DeadlineExceeded, // gRPC: Timeout + codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous CreateVolume() may be still in progress. + codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous CreateVolume() may be still in progress. + codes.Aborted: // CSI: Operation pending for volume + return false + } + // All other errors mean that provisioning either did not + // even start or failed. It is for sure not in progress. + return true +} diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index a2f6f37ac4..32042518ff 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -35,6 +35,8 @@ import ( crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/fake" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1beta1" @@ -636,7 +638,9 @@ type provisioningTestcase struct { volWithLessCap bool expectedPVSpec *pvSpec withSecretRefs bool + createVolumeError error expectErr bool + expectState controller.ProvisioningState expectCreateVolDo interface{} } @@ -676,6 +680,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "multiple fsType provision": { volOpts: controller.VolumeOptions{ @@ -687,7 +692,8 @@ func TestProvision(t *testing.T) { prefixedFsTypeKey: "ext4", }, }, - expectErr: true, + expectErr: true, + expectState: controller.ProvisioningFinished, }, "provision with prefixed FS Type key": { volOpts: controller.VolumeOptions{ @@ -718,6 +724,7 @@ func TestProvision(t *testing.T) { t.Errorf("Parameters should have been stripped") } }, + expectState: controller.ProvisioningFinished, }, "provision with access mode multi node multi writer": { volOpts: controller.VolumeOptions{ @@ -766,6 +773,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected multi_node_multi_writer") } }, + expectState: controller.ProvisioningFinished, }, "provision with access mode multi node multi readonly": { volOpts: controller.VolumeOptions{ @@ -814,6 +822,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected multi_node_reader_only") } }, + expectState: controller.ProvisioningFinished, }, "provision with access mode single writer": { volOpts: controller.VolumeOptions{ @@ -862,6 +871,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected single_node_writer") } }, + expectState: controller.ProvisioningFinished, }, "provision with multiple access modes": { volOpts: controller.VolumeOptions{ @@ -916,6 +926,7 @@ func TestProvision(t *testing.T) { t.Errorf("Expected single_node_writer") } }, + expectState: controller.ProvisioningFinished, }, "provision with secrets": { volOpts: controller.VolumeOptions{ @@ -950,6 +961,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "provision with volume mode(Filesystem)": { volOpts: controller.VolumeOptions{ @@ -972,6 +984,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "provision with volume mode(Block)": { volOpts: controller.VolumeOptions{ @@ -993,6 +1006,7 @@ func TestProvision(t *testing.T) { }, }, }, + expectState: controller.ProvisioningFinished, }, "fail to get secret reference": { volOpts: controller.VolumeOptions{ @@ -1002,6 +1016,7 @@ func TestProvision(t *testing.T) { }, getSecretRefErr: true, expectErr: true, + expectState: controller.ProvisioningNoChange, }, "fail not nil selector": { volOpts: controller.VolumeOptions{ @@ -1010,6 +1025,7 @@ func TestProvision(t *testing.T) { }, notNilSelector: true, expectErr: true, + expectState: controller.ProvisioningFinished, }, "fail to make volume name": { volOpts: controller.VolumeOptions{ @@ -1018,6 +1034,7 @@ func TestProvision(t *testing.T) { }, makeVolumeNameErr: true, expectErr: true, + expectState: controller.ProvisioningFinished, }, "fail to get credentials": { volOpts: controller.VolumeOptions{ @@ -1027,6 +1044,7 @@ func TestProvision(t *testing.T) { }, getCredentialsErr: true, expectErr: true, + expectState: controller.ProvisioningNoChange, }, "fail vol with less capacity": { volOpts: controller.VolumeOptions{ @@ -1036,6 +1054,7 @@ func TestProvision(t *testing.T) { }, volWithLessCap: true, expectErr: true, + expectState: controller.ProvisioningInBackground, }, "provision with mount options": { volOpts: controller.VolumeOptions{ @@ -1092,6 +1111,59 @@ func TestProvision(t *testing.T) { t.Errorf("Expected 2 mount options") } }, + expectState: controller.ProvisioningFinished, + }, + "provision with final error": { + volOpts: controller.VolumeOptions{ + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete, + PVName: "test-name", + PVC: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + UID: "testid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Selector: nil, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)), + }, + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + createVolumeError: status.Error(codes.Unauthenticated, "Mock final error"), + expectCreateVolDo: func(ctx context.Context, req *csi.CreateVolumeRequest) { + // intentionally empty + }, + expectErr: true, + expectState: controller.ProvisioningFinished, + }, + "provision with transient error": { + volOpts: controller.VolumeOptions{ + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete, + PVName: "test-name", + PVC: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + UID: "testid", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Selector: nil, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestedBytes, 10)), + }, + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + }, + }, + }, + createVolumeError: status.Error(codes.DeadlineExceeded, "Mock timeout"), + expectCreateVolDo: func(ctx context.Context, req *csi.CreateVolumeRequest) { + // intentionally empty + }, + expectErr: true, + expectState: controller.ProvisioningInBackground, }, } @@ -1194,18 +1266,18 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested tc.volOpts.Parameters[provisionerSecretNamespaceKey] = "default" } else if tc.volWithLessCap { out.Volume.CapacityBytes = int64(80) - controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1) - controllerServer.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&csi.DeleteVolumeResponse{}, nil).Times(1) + controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, tc.createVolumeError).Times(1) + controllerServer.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&csi.DeleteVolumeResponse{}, tc.createVolumeError).Times(1) } else if tc.expectCreateVolDo != nil { - controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Do(tc.expectCreateVolDo).Return(out, nil).Times(1) + controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Do(tc.expectCreateVolDo).Return(out, tc.createVolumeError).Times(1) } else { // Setup regular mock call expectations. if !tc.expectErr { - controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, nil).Times(1) + controllerServer.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(out, tc.createVolumeError).Times(1) } } - pv, err := csiProvisioner.Provision(tc.volOpts) + pv, state, err := csiProvisioner.(controller.ProvisionerExt).ProvisionExt(tc.volOpts) if tc.expectErr && err == nil { t.Errorf("test %q: Expected error, got none", k) } @@ -1213,6 +1285,13 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested t.Errorf("test %q: got error: %v", k, err) } + if tc.expectState == "" { + tc.expectState = controller.ProvisioningFinished + } + if tc.expectState != state { + t.Errorf("test %q: expected ProvisioningState %s, got %s", k, tc.expectState, state) + } + if tc.expectedPVSpec != nil { if pv.Name != tc.expectedPVSpec.Name { t.Errorf("test %q: expected PV name: %q, got: %q", k, tc.expectedPVSpec.Name, pv.Name)