From 80a167ae155f85db5adaef12d78977e140ab9a96 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Thu, 3 Oct 2019 23:27:20 -0400 Subject: [PATCH 1/7] check that the PVC provisioning request is for the current node --- cmd/csi-provisioner/csi-provisioner.go | 2 ++ pkg/controller/controller.go | 9 +++++++++ pkg/controller/controller_test.go | 22 +++++++++++----------- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index b017139701..5f088a2582 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -89,6 +89,7 @@ var ( capacityImmediateBinding = flag.Bool("capacity-for-immediate-binding", false, "Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging.") capacityPollInterval = flag.Duration("capacity-poll-interval", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.") capacityOwnerrefLevel = flag.Int("capacity-ownerref-level", 1, "The level indicates the number of objects that need to be traversed starting from the pod identified by the POD_NAME and POD_NAMESPACE environment variables to reach the owning object for CSIStorageCapacity objects: 0 for the pod itself, 1 for a StatefulSet, 2 for a Deployment, etc.") + enableNodeCheck = flag.Bool("enable-node-check", false, "Enables a check to see that the node selected by the scheduler for provisioning is this node.") featureGates map[string]bool provisionController *controller.ProvisionController @@ -292,6 +293,7 @@ func main() { vaLister, *extraCreateMetadata, *defaultFSType, + *enableNodeCheck, ) provisionController = controller.NewProvisionController( diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 4c878557fe..16f7a629ae 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -227,6 +227,7 @@ type csiProvisioner struct { vaLister storagelistersv1.VolumeAttachmentLister extraCreateMetadata bool eventRecorder record.EventRecorder + enableNodeCheck bool } var _ controller.Provisioner = &csiProvisioner{} @@ -297,6 +298,7 @@ func NewCSIProvisioner(client kubernetes.Interface, vaLister storagelistersv1.VolumeAttachmentLister, extraCreateMetadata bool, defaultFSType string, + enableNodeCheck bool, ) controller.Provisioner { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -328,6 +330,7 @@ func NewCSIProvisioner(client kubernetes.Interface, vaLister: vaLister, extraCreateMetadata: extraCreateMetadata, eventRecorder: eventRecorder, + enableNodeCheck: enableNodeCheck, } return provisioner } @@ -449,6 +452,12 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi } + if p.enableNodeCheck && options.SelectedNode.Name != os.Getenv("NODE_NAME") { + return nil, controller.ProvisioningNoChange, &controller.IgnoredError{ + Reason: fmt.Sprintf("Selected node (%s) is not current node (%s)", options.SelectedNode.Name, os.Getenv("NODE_NAME")), + } + } + migratedVolume := false if p.supportsMigrationFromInTreePluginName != "" { // NOTE: we cannot depend on PVC.Annotations[volume.beta.kubernetes.io/storage-provisioner] to get diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 9ceecde24c..e656dc0b0d 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -413,7 +413,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", - 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType) + 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, false) // Requested PVC with requestedBytes storage deletePolicy := v1.PersistentVolumeReclaimDelete @@ -1856,7 +1856,7 @@ func runFSTypeProvisionTest(t *testing.T, k string, tc provisioningFSTypeTestcas myDefaultfsType = "" } csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), nil, nil, nil, nil, nil, false, myDefaultfsType) + nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), nil, nil, nil, nil, nil, false, myDefaultfsType, false) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ CapacityBytes: requestedBytes, @@ -1932,7 +1932,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested clientSet := fakeclientset.NewSimpleClientset(tc.clientSetObjects...) pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), nil, nil, nil, nil, nil, tc.withExtraMetadata, defaultfsType) + nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), nil, nil, nil, nil, nil, tc.withExtraMetadata, defaultfsType, false) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ CapacityBytes: requestedBytes, @@ -2667,7 +2667,7 @@ func TestProvisionFromSnapshot(t *testing.T) { pluginCaps, controllerCaps := provisionFromSnapshotCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - client, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType) + client, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, false) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -2841,7 +2841,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) { defer close(stopChan) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType, false) pv, _, err := csiProvisioner.Provision(context.Background(), controller.ProvisionOptions{ StorageClass: &storagev1.StorageClass{}, @@ -2935,7 +2935,7 @@ func TestProvisionErrorHandling(t *testing.T) { defer close(stopChan) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType, false) options := controller.ProvisionOptions{ StorageClass: &storagev1.StorageClass{}, @@ -3008,7 +3008,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) { clientSet := fakeclientset.NewSimpleClientset() pluginCaps, controllerCaps := provisionWithTopologyCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, false) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -3345,7 +3345,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) { pluginCaps, controllerCaps := provisionCapabilities() scLister, _, _, _, vaLister, _ := listers(clientSet) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, nil, nil, nil, vaLister, false, defaultfsType) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, nil, nil, nil, vaLister, false, defaultfsType, false) err = csiProvisioner.Delete(context.Background(), tc.persistentVolume) if tc.expectErr && err == nil { @@ -3766,7 +3766,7 @@ func TestProvisionFromPVC(t *testing.T) { // Phase: execute the test csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, claimLister, nil, false, defaultfsType) + nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, claimLister, nil, false, defaultfsType, false) pv, _, err = csiProvisioner.Provision(context.Background(), tc.volOpts) if tc.expectErr && err == nil { @@ -3884,7 +3884,7 @@ func TestProvisionWithMigration(t *testing.T) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, - inTreePluginName, false, true, mockTranslator, nil, nil, nil, nil, nil, false, defaultfsType) + inTreePluginName, false, true, mockTranslator, nil, nil, nil, nil, nil, false, defaultfsType, false) // Set up return values (AnyTimes to avoid overfitting on implementation) @@ -4046,7 +4046,7 @@ func TestDeleteMigration(t *testing.T) { defer close(stopCh) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", - false, true, mockTranslator, nil, nil, nil, nil, vaLister, false, defaultfsType) + false, true, mockTranslator, nil, nil, nil, nil, vaLister, false, defaultfsType, false) // Set mock return values (AnyTimes to avoid overfitting on implementation details) mockTranslator.EXPECT().IsPVMigratable(gomock.Any()).Return(tc.expectTranslation).AnyTimes() From cacdc0be295d0400afd81c002352ea09180403be Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 24 Sep 2020 09:06:50 +0200 Subject: [PATCH 2/7] distributed provisioning, including immediate binding Immediate binding is not recommended, but is needed for the sake of feature parity. With immediate binding also supported, the code no longer just passively checks the selected node, so a different name seems more appropriate. Besides implementing immediate binding support, the original implementation also gets fixed: DeleteVolume was called by all external-provisioner instances. On most nodes that then looked like the volume had been removed already and the PV got removed before the actual node had a chance to finish the volume deletion. --- README.md | 97 +++ cmd/csi-provisioner/csi-provisioner.go | 38 +- go.mod | 1 + go.sum | 1 + pkg/controller/controller.go | 575 ++++++++++++++---- pkg/controller/controller_test.go | 399 ++++++++++-- pkg/controller/ratelimiter.go | 53 ++ pkg/controller/ratelimiter_test.go | 37 ++ pkg/controller/topology.go | 21 + vendor/k8s.io/component-helpers/LICENSE | 202 ++++++ .../scheduling/corev1/doc.go | 23 + .../scheduling/corev1/helpers.go | 45 ++ .../corev1/nodeaffinity/nodeaffinity.go | 262 ++++++++ vendor/modules.txt | 4 + 14 files changed, 1593 insertions(+), 165 deletions(-) create mode 100644 pkg/controller/ratelimiter.go create mode 100644 pkg/controller/ratelimiter_test.go create mode 100644 vendor/k8s.io/component-helpers/LICENSE create mode 100644 vendor/k8s.io/component-helpers/scheduling/corev1/doc.go create mode 100644 vendor/k8s.io/component-helpers/scheduling/corev1/helpers.go create mode 100644 vendor/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity.go diff --git a/README.md b/README.md index fdf8f12a46..97286f1525 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,16 @@ See the [storage capacity section](#capacity-support) below for details. * `--capacity-for-immediate-binding `: Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging. Defaults to `false`. +##### Distributed provisioning + +* `--node-deployment`: Enables deploying the external-provisioner together with a CSI driver on nodes to manage node-local volumes. Off by default. + +* `--node-deployment-immediate-binding`: Determines whether immediate binding is supported when deployed on each node. Enabled by default, use `--node-deployment-immediate-binding=false` if not desired. + +* `--node-deployment-base-delay`: Determines how long the external-provisioner sleeps initially before trying to own a PVC with immediate binding. Defaults to 20 seconds. + +* `--node-deployment-max-delay`: Determines how long the external-provisioner sleeps at most before trying to own a PVC with immediate binding. Defaults to 60 seconds. + #### Other recognized arguments * `--feature-gates `: A set of comma separated `=` pairs that describe feature gates for alpha/experimental features. See [list of features](#feature-status) or `--help` output for list of recognized features. Example: `--feature-gates Topology=true` to enable Topology feature that's disabled by default. @@ -242,6 +252,93 @@ The external-provisioner optionally exposes an HTTP endpoint at address:port spe * Metrics path, as set by `--metrics-path` argument (default is `/metrics`). * Leader election health check at `/healthz/leader-election`. It is recommended to run a liveness probe against this endpoint when leader election is used to kill external-provisioner leader that fails to connect to the API server to renew its leadership. See https://github.com/kubernetes-csi/csi-lib-utils/issues/66 for details. +### Deployment on each node + +Normally, external-provisioner is deployed once in a cluster and +communicates with a control instance of the CSI driver which then +provisions volumes via some kind of storage backend API. CSI drivers +which manage local storage on a node don't have such an API that a +central controller could use. + +To support this case, external-provisioner can be deployed alongside +each CSI driver on different nodes. The CSI driver deployment must: +- support topology, usually with one topology key + ("csi.example.org/node") and the Kubernetes node name as value +- use a service account that has the same RBAC rules as for a normal + deployment +- invoke external-provisioner with `--node-deployment` +- tweak `--node-deployment-base-delay` and `--node-deployment-max-delay` + to match the expected cluster size and desired response times + (only relevant when there are storage classes with immediate binding, + see below for details) +- set the `NODE_NAME` environment variable to the name of the Kubernetes node +- implement `GetCapacity` + +Usage of `--strict-topology` and `--immediate-topology=false` is +recommended because it makes the `CreateVolume` invocations simpler. + +Volume provisioning with late binding works as before, except that +each external-provisioner instance checks the "selected node" +annotation and only creates volumes if that node is the one it runs +on. It also only deletes volumes on its own node. + +Immediate binding is also supported, but not recommended. It is +implemented by letting the external-provisioner instances assign a PVC +to one of them: when they see a new PVC with immediate binding, they +all attempt to set the "selected node" annotation with their own node +name as value. Only one update request can succeed, all others get a +"conflict" error and then know that some other instance was faster. To +avoid the thundering herd problem, each instance waits for a random +period before issuing an update request. + +When `CreateVolume` call fails with `ResourcesExhausted`, the normal +recovery mechanism is used, i.e. the external-provisioner instance +removes the "selected node" annotation and the process repeats. But +this triggers events for the PVC and delays volume creation, in +particular when storage is exhausted on most nodes. Therefore +external-provisioner checks with `GetCapacity` *before* attempting to +own a PVC whether the currently available capacity is sufficient for +the volume. When it isn't, the PVC is ignored and some other instance +can own it. + +The `--node-deployment-base-delay` parameter determines the initial +wait period. It also sets the jitter, so in practice the initial wait period will be +in the range from zero to the base delay. After a collision, the delay +increases exponentially. If the value is high, volumes with immediate +binding get created more slowly. If it is low, then the risk of +conflicts while setting the "selected node" annotation increases and +the apiserver load will be higher. + +There is an exponential backoff per PVC which is used for unexpected +problems. Normally, an owner for a PVC is chosen during the first +attempt, so most PVCs will use the base delays. A maximum can be set +with `--node-deployment-max-delay` anyway, to avoid very long delays +when something went wrong repeatedly. + +During scale testing with 100 external-provisioner instances, a base +delay of 20 seconds worked well. When provisioning 3000 volumes, there +were only 500 conflicts which the apiserver handled without getting +overwhelmed. The average provisioning rate of around 40 volumes/second +was the same as with a delay of 10 seconds. The worst-case latency per +volume was probably higher, but that wasn't measured. + +Note that the QPS settings of kube-controller-manager and +external-provisioner have to be increased at the moment (Kubernetes +1.19) to provision volumes faster than around 4 volumes/second. + +Beware that if *no* node has sufficient storage available, then also +no `CreateVolume` call is attempted and thus no events are generated +for the PVC, i.e. some other means of tracking remaining storage +capacity must be used to detect when the cluster runs out of storage. + +Because PVCs with immediate binding get distributed randomly among +nodes, they get spread evenly. If that is not desirable, then it is +possible to disable support for immediate binding in distributed +provisioning with `--node-deployment-immediate-binding=false` and +instead implement a custom policy in a separate controller which sets +the "selected node" annotation to trigger local provisioning on the +desired node. + ## Community, discussion, contribution, and support Learn how to engage with the Kubernetes community on the [community page](http://kubernetes.io/community/). diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 5f088a2582..7c21e3196b 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -89,7 +89,11 @@ var ( capacityImmediateBinding = flag.Bool("capacity-for-immediate-binding", false, "Enables producing capacity information for storage classes with immediate binding. Not needed for the Kubernetes scheduler, maybe useful for other consumers or for debugging.") capacityPollInterval = flag.Duration("capacity-poll-interval", time.Minute, "How long the external-provisioner waits before checking for storage capacity changes.") capacityOwnerrefLevel = flag.Int("capacity-ownerref-level", 1, "The level indicates the number of objects that need to be traversed starting from the pod identified by the POD_NAME and POD_NAMESPACE environment variables to reach the owning object for CSIStorageCapacity objects: 0 for the pod itself, 1 for a StatefulSet, 2 for a Deployment, etc.") - enableNodeCheck = flag.Bool("enable-node-check", false, "Enables a check to see that the node selected by the scheduler for provisioning is this node.") + + enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the external-provisioner together with a CSI driver on nodes to manage node-local volumes.") + nodeDeploymentImmediateBinding = flag.Bool("node-deployment-immediate-binding", true, "Determines whether immediate binding is supported when deployed on each node.") + nodeDeploymentBaseDelay = flag.Duration("node-deployment-base-delay", 20*time.Second, "Determines how long the external-provisioner sleeps initially before trying to own a PVC with immediate binding.") + nodeDeploymentMaxDelay = flag.Duration("node-deployment-max-delay", 60*time.Second, "Determines how long the external-provisioner sleeps at most before trying to own a PVC with immediate binding.") featureGates map[string]bool provisionController *controller.ProvisionController @@ -117,6 +121,11 @@ func main() { klog.Fatal(err) } + node := os.Getenv("NODE_NAME") + if *enableNodeDeployment && node == "" { + klog.Fatal("The NODE_NAME environment variable must be set when using --enable-node-deployment.") + } + if *showVersion { fmt.Println(os.Args[0], version) os.Exit(0) @@ -215,6 +224,9 @@ func main() { // Generate a unique ID for this provisioner timeStamp := time.Now().UnixNano() / int64(time.Millisecond) identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName + if *enableNodeDeployment { + identity = identity + "-" + node + } factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer) var factoryForNamespace informers.SharedInformerFactory // usually nil, only used for CSIStorageCapacity @@ -225,7 +237,6 @@ func main() { scLister := factory.Storage().V1().StorageClasses().Lister() claimLister := factory.Core().V1().PersistentVolumeClaims().Lister() - var csiNodeLister storagelistersv1.CSINodeLister var vaLister storagelistersv1.VolumeAttachmentLister if controllerCapabilities[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME] { klog.Info("CSI driver supports PUBLISH_UNPUBLISH_VOLUME, watching VolumeAttachments") @@ -233,8 +244,29 @@ func main() { } else { klog.Info("CSI driver does not support PUBLISH_UNPUBLISH_VOLUME, not watching VolumeAttachments") } + + var nodeDeployment *ctrl.NodeDeployment + if *enableNodeDeployment { + nodeDeployment = &ctrl.NodeDeployment{ + NodeName: node, + ClaimInformer: factory.Core().V1().PersistentVolumeClaims(), + ImmediateBinding: *nodeDeploymentImmediateBinding, + BaseDelay: *nodeDeploymentBaseDelay, + MaxDelay: *nodeDeploymentMaxDelay, + } + nodeInfo, err := ctrl.GetNodeInfo(grpcClient, *operationTimeout) + if err != nil { + klog.Fatalf("Failed to get node info from CSI driver: %v", err) + } + nodeDeployment.NodeInfo = *nodeInfo + } + var nodeLister v1.NodeLister + var csiNodeLister storagelistersv1.CSINodeLister if ctrl.SupportsTopology(pluginCapabilities) { + // TODO (?): when deployed on each node with --strict-topology=true, then the topology + // code only needs the static information about the local node. We can avoid the overhead + // of watching the actual objects by providing just that information. csiNodeLister = factory.Storage().V1().CSINodes().Lister() nodeLister = factory.Core().V1().Nodes().Lister() } @@ -293,7 +325,7 @@ func main() { vaLister, *extraCreateMetadata, *defaultFSType, - *enableNodeCheck, + nodeDeployment, ) provisionController = controller.NewProvisionController( diff --git a/go.mod b/go.mod index 8d6b156df3..c4fe3258d5 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( k8s.io/apiserver v0.20.0 k8s.io/client-go v0.20.0 k8s.io/component-base v0.20.0 + k8s.io/component-helpers v0.20.0 k8s.io/csi-translation-lib v0.20.0 k8s.io/klog/v2 v2.4.0 k8s.io/kubernetes v1.20.0 diff --git a/go.sum b/go.sum index d51d31aa8a..79c72dd885 100644 --- a/go.sum +++ b/go.sum @@ -1238,6 +1238,7 @@ k8s.io/cluster-bootstrap v0.20.0/go.mod h1:6WZaNIBvcvL7MkPzSRKrZDIr4u+ePW2oIWoRs k8s.io/code-generator v0.20.0/go.mod h1:UsqdF+VX4PU2g46NC2JRs4gc+IfrctnwHb76RNbWHJg= k8s.io/component-base v0.20.0 h1:BXGL8iitIQD+0NgW49UsM7MraNUUGDU3FBmrfUAtmVQ= k8s.io/component-base v0.20.0/go.mod h1:wKPj+RHnAr8LW2EIBIK7AxOHPde4gme2lzXwVSoRXeA= +k8s.io/component-helpers v0.20.0 h1:7Zi1fcb5nV0h03d9eeZGk71+ZWYvAN4Be+xMOZyFerc= k8s.io/component-helpers v0.20.0/go.mod h1:nx6NOtfSfGOxnSZsDJxpGbnsVuUA1UXpwDvZIrtigNk= k8s.io/controller-manager v0.20.0/go.mod h1:nD4qym/pmCz2v1tpqvlEBVlHW9CAZwedloM8GrJTLpg= k8s.io/cri-api v0.20.0/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI= diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 16f7a629ae..1e6e475fb7 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -31,19 +31,23 @@ import ( "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" _ "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" storagelistersv1 "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/sig-storage-lib-external-provisioner/v6/controller" "sigs.k8s.io/sig-storage-lib-external-provisioner/v6/util" @@ -131,6 +135,7 @@ const ( annMigratedTo = "pv.kubernetes.io/migrated-to" annStorageProvisioner = "volume.beta.kubernetes.io/storage-provisioner" + annSelectedNode = "volume.kubernetes.io/selected-node" snapshotNotBound = "snapshot %s not bound" @@ -201,7 +206,22 @@ type requiredCapabilities struct { clone bool } -// CSIProvisioner struct +// NodeDeployment contains additional parameters for running external-provisioner alongside a +// CSI driver on one or more nodes in the cluster. +type NodeDeployment struct { + NodeName string + ClaimInformer coreinformers.PersistentVolumeClaimInformer + NodeInfo csi.NodeGetInfoResponse + ImmediateBinding bool + BaseDelay time.Duration + MaxDelay time.Duration +} + +type internalNodeDeployment struct { + NodeDeployment + rateLimiter workqueue.RateLimiter +} + type csiProvisioner struct { client kubernetes.Interface csiClient csi.ControllerClient @@ -227,7 +247,7 @@ type csiProvisioner struct { vaLister storagelistersv1.VolumeAttachmentLister extraCreateMetadata bool eventRecorder record.EventRecorder - enableNodeCheck bool + nodeDeployment *internalNodeDeployment } var _ controller.Provisioner = &csiProvisioner{} @@ -273,6 +293,13 @@ func GetDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (rpc.Pl return pluginCapabilities, controllerCapabilities, nil } +func GetNodeInfo(conn *grpc.ClientConn, timeout time.Duration) (*csi.NodeGetInfoResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + client := csi.NewNodeClient(conn) + return client.NodeGetInfo(ctx, &csi.NodeGetInfoRequest{}) +} + // NewCSIProvisioner creates new CSI provisioner. // // vaLister is optional and only needed when VolumeAttachments are @@ -298,7 +325,7 @@ func NewCSIProvisioner(client kubernetes.Interface, vaLister storagelistersv1.VolumeAttachmentLister, extraCreateMetadata bool, defaultFSType string, - enableNodeCheck bool, + nodeDeployment *NodeDeployment, ) controller.Provisioner { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -306,6 +333,7 @@ func NewCSIProvisioner(client kubernetes.Interface, eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("external-provisioner")}) csiClient := csi.NewControllerClient(grpcClient) + provisioner := &csiProvisioner{ client: client, grpcClient: grpcClient, @@ -330,8 +358,23 @@ func NewCSIProvisioner(client kubernetes.Interface, vaLister: vaLister, extraCreateMetadata: extraCreateMetadata, eventRecorder: eventRecorder, - enableNodeCheck: enableNodeCheck, } + if nodeDeployment != nil { + provisioner.nodeDeployment = &internalNodeDeployment{ + NodeDeployment: *nodeDeployment, + rateLimiter: newItemExponentialFailureRateLimiterWithJitter(nodeDeployment.BaseDelay, nodeDeployment.MaxDelay), + } + // Remove deleted PVCs from rate limiter. + claimHandler := cache.ResourceEventHandlerFuncs{ + DeleteFunc: func(obj interface{}) { + if claim, ok := obj.(*v1.PersistentVolumeClaim); ok { + provisioner.nodeDeployment.rateLimiter.Forget(claim.UID) + } + }, + } + provisioner.nodeDeployment.ClaimInformer.Informer().AddEventHandler(claimHandler) + } + return provisioner } @@ -418,44 +461,35 @@ func getAccessMode(pvcAccessMode v1.PersistentVolumeAccessMode) *csi.VolumeCapab } func getVolumeCapability( - options controller.ProvisionOptions, + claim *v1.PersistentVolumeClaim, + sc *storagev1.StorageClass, pvcAccessMode v1.PersistentVolumeAccessMode, fsType string, ) *csi.VolumeCapability { - if util.CheckPersistentVolumeClaimModeBlock(options.PVC) { + if util.CheckPersistentVolumeClaimModeBlock(claim) { return &csi.VolumeCapability{ AccessType: getAccessTypeBlock(), AccessMode: getAccessMode(pvcAccessMode), } } return &csi.VolumeCapability{ - AccessType: getAccessTypeMount(fsType, options.StorageClass.MountOptions), + AccessType: getAccessTypeMount(fsType, sc.MountOptions), AccessMode: getAccessMode(pvcAccessMode), } } -func (p *csiProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) { - if options.StorageClass == nil { - return nil, controller.ProvisioningFinished, errors.New("storage class was nil") - } - - if options.PVC.Annotations[annStorageProvisioner] != p.driverName && options.PVC.Annotations[annMigratedTo] != p.driverName { - // The storage provisioner annotation may not equal driver name but the - // PVC could have annotation "migrated-to" which is the new way to - // signal a PVC is migrated (k8s v1.17+) - return nil, controller.ProvisioningFinished, &controller.IgnoredError{ - Reason: fmt.Sprintf("PVC annotated with external-provisioner name %s does not match provisioner driver name %s. This could mean the PVC is not migrated", - options.PVC.Annotations[annStorageProvisioner], - p.driverName), - } - - } +type prepareProvisionResult struct { + fsType string + migratedVolume bool + req *csi.CreateVolumeRequest + csiPVSource *v1.CSIPersistentVolumeSource +} - if p.enableNodeCheck && options.SelectedNode.Name != os.Getenv("NODE_NAME") { - return nil, controller.ProvisioningNoChange, &controller.IgnoredError{ - Reason: fmt.Sprintf("Selected node (%s) is not current node (%s)", options.SelectedNode.Name, os.Getenv("NODE_NAME")), - } +// prepareProvision does non-destructive parameter checking and preparations for provisioning a volume. +func (p *csiProvisioner) prepareProvision(ctx context.Context, claim *v1.PersistentVolumeClaim, sc *storagev1.StorageClass, selectedNode *v1.Node) (*prepareProvisionResult, controller.ProvisioningState, error) { + if sc == nil { + return nil, controller.ProvisioningFinished, errors.New("storage class was nil") } migratedVolume := false @@ -464,31 +498,31 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi // the in-tree provisioner name in case of CSI migration scenarios. The annotation will be // set to the CSI provisioner name by PV controller for migration scenarios // so that external provisioner can correctly pick up the PVC pointing to an in-tree plugin - if options.StorageClass.Provisioner == p.supportsMigrationFromInTreePluginName { - klog.V(2).Infof("translating storage class for in-tree plugin %s to CSI", options.StorageClass.Provisioner) - storageClass, err := p.translator.TranslateInTreeStorageClassToCSI(p.supportsMigrationFromInTreePluginName, options.StorageClass) + if sc.Provisioner == p.supportsMigrationFromInTreePluginName { + klog.V(2).Infof("translating storage class for in-tree plugin %s to CSI", sc.Provisioner) + storageClass, err := p.translator.TranslateInTreeStorageClassToCSI(p.supportsMigrationFromInTreePluginName, sc) if err != nil { return nil, controller.ProvisioningFinished, fmt.Errorf("failed to translate storage class: %v", err) } - options.StorageClass = storageClass + sc = storageClass migratedVolume = true } else { - klog.V(4).Infof("skip translation of storage class for plugin: %s", options.StorageClass.Provisioner) + klog.V(4).Infof("skip translation of storage class for plugin: %s", sc.Provisioner) } } // Make sure the plugin is capable of fulfilling the requested options rc := &requiredCapabilities{} - if options.PVC.Spec.DataSource != nil { + if claim.Spec.DataSource != nil { // PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object - if options.PVC.Spec.DataSource.Name == "" { - return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name) + if claim.Spec.DataSource.Name == "" { + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source not found for PVC %s", claim.Name) } - switch options.PVC.Spec.DataSource.Kind { + switch claim.Spec.DataSource.Kind { case snapshotKind: - if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup { - return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup)) + if *(claim.Spec.DataSource.APIGroup) != snapshotAPIGroup { + return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(claim.Spec.DataSource.APIGroup)) } rc.snapshot = true case pvcKind: @@ -496,10 +530,10 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi default: // DataSource is not VolumeSnapshot and PVC // Assume external data populator to create the volume, and there is no more work for us to do - p.eventRecorder.Event(options.PVC, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("Assuming an external populator will provision the volume")) + p.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("Assuming an external populator will provision the volume")) return nil, controller.ProvisioningFinished, &controller.IgnoredError{ Reason: fmt.Sprintf("data source (%s) is not handled by the provisioner, assuming an external populator will provision it", - options.PVC.Spec.DataSource.Kind), + claim.Spec.DataSource.Kind), } } } @@ -507,18 +541,18 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi return nil, controller.ProvisioningFinished, err } - if options.PVC.Spec.Selector != nil { + if claim.Spec.Selector != nil { return nil, controller.ProvisioningFinished, fmt.Errorf("claim Selector is not supported") } - pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength) + pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", claim.ObjectMeta.UID), p.volumeNameUUIDLength) if err != nil { return nil, controller.ProvisioningFinished, err } fsTypesFound := 0 fsType := "" - for k, v := range options.StorageClass.Parameters { + for k, v := range sc.Parameters { if strings.ToLower(k) == "fstype" || k == prefixedFsTypeKey { fsType = v fsTypesFound++ @@ -534,35 +568,35 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi fsType = p.defaultFSType } - capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] + capacity := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] volSizeBytes := capacity.Value() // Get access mode volumeCaps := make([]*csi.VolumeCapability, 0) - for _, pvcAccessMode := range options.PVC.Spec.AccessModes { - volumeCaps = append(volumeCaps, getVolumeCapability(options, pvcAccessMode, fsType)) + for _, pvcAccessMode := range claim.Spec.AccessModes { + volumeCaps = append(volumeCaps, getVolumeCapability(claim, sc, pvcAccessMode, fsType)) } // Create a CSI CreateVolumeRequest and Response req := csi.CreateVolumeRequest{ Name: pvName, - Parameters: options.StorageClass.Parameters, + Parameters: sc.Parameters, VolumeCapabilities: volumeCaps, CapacityRange: &csi.CapacityRange{ RequiredBytes: int64(volSizeBytes), }, } - if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) { - volumeContentSource, err := p.getVolumeContentSource(ctx, options) + if claim.Spec.DataSource != nil && (rc.clone || rc.snapshot) { + volumeContentSource, err := p.getVolumeContentSource(ctx, claim, sc) if err != nil { - return nil, controller.ProvisioningNoChange, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err) + return nil, controller.ProvisioningNoChange, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", claim.Spec.DataSource.Kind, claim.Spec.DataSource.Name, err) } req.VolumeContentSource = volumeContentSource } - if options.PVC.Spec.DataSource != nil && rc.clone { - err = p.setCloneFinalizer(ctx, options.PVC) + if claim.Spec.DataSource != nil && rc.clone { + err = p.setCloneFinalizer(ctx, claim) if err != nil { return nil, controller.ProvisioningNoChange, err } @@ -572,9 +606,9 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi requirements, err := GenerateAccessibilityRequirements( p.client, p.driverName, - options.PVC.Name, - options.StorageClass.AllowedTopologies, - options.SelectedNode, + claim.Name, + sc.AllowedTopologies, + selectedNode, p.strictTopology, p.immediateTopology, p.csiNodeLister, @@ -585,15 +619,11 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi req.AccessibilityRequirements = requirements } - klog.V(5).Infof("CreateVolumeRequest %+v", req) - - rep := &csi.CreateVolumeResponse{} - // Resolve provision secret credentials. - provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.StorageClass.Parameters, pvName, &v1.PersistentVolumeClaim{ + provisionerSecretRef, err := getSecretReference(provisionerSecretParams, sc.Parameters, pvName, &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: options.PVC.Name, - Namespace: options.PVC.Namespace, + Name: claim.Name, + Namespace: claim.Namespace, }, }) if err != nil { @@ -606,38 +636,91 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi req.Secrets = provisionerCredentials // Resolve controller publish, node stage, node publish secret references - controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC) + controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, sc.Parameters, pvName, claim) if err != nil { return nil, controller.ProvisioningNoChange, err } - nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.StorageClass.Parameters, pvName, options.PVC) + nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, sc.Parameters, pvName, claim) if err != nil { return nil, controller.ProvisioningNoChange, err } - nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC) + nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, sc.Parameters, pvName, claim) if err != nil { return nil, controller.ProvisioningNoChange, err } - controllerExpandSecretRef, err := getSecretReference(controllerExpandSecretParams, options.StorageClass.Parameters, pvName, options.PVC) + controllerExpandSecretRef, err := getSecretReference(controllerExpandSecretParams, sc.Parameters, pvName, claim) if err != nil { return nil, controller.ProvisioningNoChange, err } + csiPVSource := &v1.CSIPersistentVolumeSource{ + Driver: p.driverName, + // VolumeHandle and VolumeAttributes will be added after provisioning. + ControllerPublishSecretRef: controllerPublishSecretRef, + NodeStageSecretRef: nodeStageSecretRef, + NodePublishSecretRef: nodePublishSecretRef, + ControllerExpandSecretRef: controllerExpandSecretRef, + } - req.Parameters, err = removePrefixedParameters(options.StorageClass.Parameters) + req.Parameters, err = removePrefixedParameters(sc.Parameters) if err != nil { return nil, controller.ProvisioningFinished, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err) } if p.extraCreateMetadata { // add pvc and pv metadata to request for use by the plugin - req.Parameters[pvcNameKey] = options.PVC.GetName() - req.Parameters[pvcNamespaceKey] = options.PVC.GetNamespace() + req.Parameters[pvcNameKey] = claim.GetName() + req.Parameters[pvcNamespaceKey] = claim.GetNamespace() req.Parameters[pvNameKey] = pvName } + return &prepareProvisionResult{ + fsType: fsType, + migratedVolume: migratedVolume, + req: &req, + csiPVSource: csiPVSource, + }, controller.ProvisioningNoChange, nil +} + +func (p *csiProvisioner) Provision(ctx context.Context, options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) { + claim := options.PVC + if claim.Annotations[annStorageProvisioner] != p.driverName && claim.Annotations[annMigratedTo] != p.driverName { + // The storage provisioner annotation may not equal driver name but the + // PVC could have annotation "migrated-to" which is the new way to + // signal a PVC is migrated (k8s v1.17+) + return nil, controller.ProvisioningFinished, &controller.IgnoredError{ + Reason: fmt.Sprintf("PVC annotated with external-provisioner name %s does not match provisioner driver name %s. This could mean the PVC is not migrated", + claim.Annotations[annStorageProvisioner], + p.driverName), + } + + } + + // The same check already ran in ShouldProvision, but perhaps + // it couldn't complete due to some unexpected error. + owned, err := p.checkNode(ctx, claim, options.StorageClass, "provision") + if err != nil { + return nil, controller.ProvisioningNoChange, + fmt.Errorf("node check failed: %v", err) + } + if !owned { + return nil, controller.ProvisioningNoChange, &controller.IgnoredError{ + Reason: fmt.Sprintf("not responsible for provisioning of PVC %s/%s because it is not assigned to node %q", claim.Namespace, claim.Name, p.nodeDeployment.NodeName), + } + } + + result, state, err := p.prepareProvision(ctx, claim, options.StorageClass, options.SelectedNode) + if result == nil { + return nil, state, err + } + req := result.req + volSizeBytes := req.CapacityRange.RequiredBytes + pvName := req.Name + provisionerCredentials := req.Secrets + createCtx, cancel := context.WithTimeout(ctx, p.timeout) defer cancel() - rep, err = p.csiClient.CreateVolume(createCtx, &req) + klog.V(5).Infof("CreateVolumeRequest %+v", req) + rep, err := p.csiClient.CreateVolume(createCtx, req) if err != nil { // Giving up after an error and telling the pod scheduler to retry with a different node @@ -707,6 +790,8 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi } } + result.csiPVSource.VolumeHandle = p.volumeIdToHandle(rep.Volume.VolumeId) + result.csiPVSource.VolumeAttributes = volumeAttributes pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: pvName, @@ -719,15 +804,7 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi }, // TODO wait for CSI VolumeSource API PersistentVolumeSource: v1.PersistentVolumeSource{ - CSI: &v1.CSIPersistentVolumeSource{ - Driver: p.driverName, - VolumeHandle: p.volumeIdToHandle(rep.Volume.VolumeId), - VolumeAttributes: volumeAttributes, - ControllerPublishSecretRef: controllerPublishSecretRef, - NodeStageSecretRef: nodeStageSecretRef, - NodePublishSecretRef: nodePublishSecretRef, - ControllerExpandSecretRef: controllerExpandSecretRef, - }, + CSI: result.csiPVSource, }, }, } @@ -746,12 +823,12 @@ func (p *csiProvisioner) Provision(ctx context.Context, options controller.Provi } // Set FSType if PV is not Block Volume if !util.CheckPersistentVolumeClaimModeBlock(options.PVC) { - pv.Spec.PersistentVolumeSource.CSI.FSType = fsType + pv.Spec.PersistentVolumeSource.CSI.FSType = result.fsType } klog.V(2).Infof("successfully created PV %v for PVC %v and csi volume name %v", pv.Name, options.PVC.Name, pv.Spec.CSI.VolumeHandle) - if migratedVolume { + if result.migratedVolume { pv, err = p.translator.TranslateCSIPVToInTree(pv) if err != nil { klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err) @@ -823,12 +900,12 @@ func removePrefixedParameters(param map[string]string) (map[string]string, error // currently we provide Snapshot and PVC, the default case allows the provisioner to still create a volume // so that an external controller can act upon it. Additional DataSource types can be added here with // an appropriate implementation function -func (p *csiProvisioner) getVolumeContentSource(ctx context.Context, options controller.ProvisionOptions) (*csi.VolumeContentSource, error) { - switch options.PVC.Spec.DataSource.Kind { +func (p *csiProvisioner) getVolumeContentSource(ctx context.Context, claim *v1.PersistentVolumeClaim, sc *storagev1.StorageClass) (*csi.VolumeContentSource, error) { + switch claim.Spec.DataSource.Kind { case snapshotKind: - return p.getSnapshotSource(ctx, options) + return p.getSnapshotSource(ctx, claim, sc) case pvcKind: - return p.getPVCSource(ctx, options) + return p.getPVCSource(ctx, claim, sc) default: // For now we shouldn't pass other things to this function, but treat it as a noop and extend as needed return nil, nil @@ -837,32 +914,32 @@ func (p *csiProvisioner) getVolumeContentSource(ctx context.Context, options con // getPVCSource verifies DataSource.Kind of type PersistentVolumeClaim, making sure that the requested PVC is available/ready // returns the VolumeContentSource for the requested PVC -func (p *csiProvisioner) getPVCSource(ctx context.Context, options controller.ProvisionOptions) (*csi.VolumeContentSource, error) { - sourcePVC, err := p.claimLister.PersistentVolumeClaims(options.PVC.Namespace).Get(options.PVC.Spec.DataSource.Name) +func (p *csiProvisioner) getPVCSource(ctx context.Context, claim *v1.PersistentVolumeClaim, sc *storagev1.StorageClass) (*csi.VolumeContentSource, error) { + sourcePVC, err := p.claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Spec.DataSource.Name) if err != nil { - return nil, fmt.Errorf("error getting PVC %s (namespace %q) from api server: %v", options.PVC.Spec.DataSource.Name, options.PVC.Namespace, err) + return nil, fmt.Errorf("error getting PVC %s (namespace %q) from api server: %v", claim.Spec.DataSource.Name, claim.Namespace, err) } if string(sourcePVC.Status.Phase) != "Bound" { - return nil, fmt.Errorf("the PVC DataSource %s must have a status of Bound. Got %v", options.PVC.Spec.DataSource.Name, sourcePVC.Status) + return nil, fmt.Errorf("the PVC DataSource %s must have a status of Bound. Got %v", claim.Spec.DataSource.Name, sourcePVC.Status) } if sourcePVC.ObjectMeta.DeletionTimestamp != nil { - return nil, fmt.Errorf("the PVC DataSource %s is currently being deleted", options.PVC.Spec.DataSource.Name) + return nil, fmt.Errorf("the PVC DataSource %s is currently being deleted", claim.Spec.DataSource.Name) } if sourcePVC.Spec.StorageClassName == nil { return nil, fmt.Errorf("the source PVC (%s) storageclass cannot be empty", sourcePVC.Name) } - if options.PVC.Spec.StorageClassName == nil { - return nil, fmt.Errorf("the requested PVC (%s) storageclass cannot be empty", options.PVC.Name) + if claim.Spec.StorageClassName == nil { + return nil, fmt.Errorf("the requested PVC (%s) storageclass cannot be empty", claim.Name) } - if *sourcePVC.Spec.StorageClassName != *options.PVC.Spec.StorageClassName { + if *sourcePVC.Spec.StorageClassName != *claim.Spec.StorageClassName { return nil, fmt.Errorf("the source PVC and destination PVCs must be in the same storage class for cloning. Source is in %v, but new PVC is in %v", - *sourcePVC.Spec.StorageClassName, *options.PVC.Spec.StorageClassName) + *sourcePVC.Spec.StorageClassName, *claim.Spec.StorageClassName) } - capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] + capacity := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] requestedSize := capacity.Value() srcCapacity := sourcePVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] srcPVCSize := srcCapacity.Value() @@ -885,8 +962,8 @@ func (p *csiProvisioner) getPVCSource(ctx context.Context, options controller.Pr return nil, fmt.Errorf("claim in dataSource not bound or invalid") } - if sourcePV.Spec.CSI.Driver != options.StorageClass.Provisioner { - klog.Warningf("the source volume %s for PVC %s/%s is handled by a different CSI driver than requested by StorageClass %s", sourcePVC.Spec.VolumeName, sourcePVC.Namespace, sourcePVC.Name, *options.PVC.Spec.StorageClassName) + if sourcePV.Spec.CSI.Driver != sc.Provisioner { + klog.Warningf("the source volume %s for PVC %s/%s is handled by a different CSI driver than requested by StorageClass %s", sourcePVC.Spec.VolumeName, sourcePVC.Namespace, sourcePVC.Name, *claim.Spec.StorageClassName) return nil, fmt.Errorf("claim in dataSource not bound or invalid") } @@ -905,13 +982,13 @@ func (p *csiProvisioner) getPVCSource(ctx context.Context, options controller.Pr return nil, fmt.Errorf("claim in dataSource not bound or invalid") } - if options.PVC.Spec.VolumeMode == nil || *options.PVC.Spec.VolumeMode == v1.PersistentVolumeFilesystem { + if claim.Spec.VolumeMode == nil || *claim.Spec.VolumeMode == v1.PersistentVolumeFilesystem { if sourcePV.Spec.VolumeMode != nil && *sourcePV.Spec.VolumeMode != v1.PersistentVolumeFilesystem { return nil, fmt.Errorf("the source PVC and destination PVCs must have the same volume mode for cloning. Source is Block, but new PVC requested Filesystem") } } - if options.PVC.Spec.VolumeMode != nil && *options.PVC.Spec.VolumeMode == v1.PersistentVolumeBlock { + if claim.Spec.VolumeMode != nil && *claim.Spec.VolumeMode == v1.PersistentVolumeBlock { if sourcePV.Spec.VolumeMode == nil || *sourcePV.Spec.VolumeMode != v1.PersistentVolumeBlock { return nil, fmt.Errorf("the source PVC and destination PVCs must have the same volume mode for cloning. Source is Filesystem, but new PVC requested Block") } @@ -932,46 +1009,46 @@ func (p *csiProvisioner) getPVCSource(ctx context.Context, options controller.Pr // getSnapshotSource verifies DataSource.Kind of type VolumeSnapshot, making sure that the requested Snapshot is available/ready // returns the VolumeContentSource for the requested snapshot -func (p *csiProvisioner) getSnapshotSource(ctx context.Context, options controller.ProvisionOptions) (*csi.VolumeContentSource, error) { - snapshotObj, err := p.snapshotClient.SnapshotV1beta1().VolumeSnapshots(options.PVC.Namespace).Get(ctx, options.PVC.Spec.DataSource.Name, metav1.GetOptions{}) +func (p *csiProvisioner) getSnapshotSource(ctx context.Context, claim *v1.PersistentVolumeClaim, sc *storagev1.StorageClass) (*csi.VolumeContentSource, error) { + snapshotObj, err := p.snapshotClient.SnapshotV1beta1().VolumeSnapshots(claim.Namespace).Get(ctx, claim.Spec.DataSource.Name, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("error getting snapshot %s from api server: %v", options.PVC.Spec.DataSource.Name, err) + return nil, fmt.Errorf("error getting snapshot %s from api server: %v", claim.Spec.DataSource.Name, err) } if snapshotObj.ObjectMeta.DeletionTimestamp != nil { - return nil, fmt.Errorf("snapshot %s is currently being deleted", options.PVC.Spec.DataSource.Name) + return nil, fmt.Errorf("snapshot %s is currently being deleted", claim.Spec.DataSource.Name) } klog.V(5).Infof("VolumeSnapshot %+v", snapshotObj) if snapshotObj.Status == nil || snapshotObj.Status.BoundVolumeSnapshotContentName == nil { - return nil, fmt.Errorf(snapshotNotBound, options.PVC.Spec.DataSource.Name) + return nil, fmt.Errorf(snapshotNotBound, claim.Spec.DataSource.Name) } snapContentObj, err := p.snapshotClient.SnapshotV1beta1().VolumeSnapshotContents().Get(ctx, *snapshotObj.Status.BoundVolumeSnapshotContentName, metav1.GetOptions{}) if err != nil { klog.Warningf("error getting snapshotcontent %s for snapshot %s/%s from api server: %s", *snapshotObj.Status.BoundVolumeSnapshotContentName, snapshotObj.Namespace, snapshotObj.Name, err) - return nil, fmt.Errorf(snapshotNotBound, options.PVC.Spec.DataSource.Name) + return nil, fmt.Errorf(snapshotNotBound, claim.Spec.DataSource.Name) } if snapContentObj.Spec.VolumeSnapshotRef.UID != snapshotObj.UID || snapContentObj.Spec.VolumeSnapshotRef.Namespace != snapshotObj.Namespace || snapContentObj.Spec.VolumeSnapshotRef.Name != snapshotObj.Name { klog.Warningf("snapshotcontent %s for snapshot %s/%s is bound to a different snapshot", *snapshotObj.Status.BoundVolumeSnapshotContentName, snapshotObj.Namespace, snapshotObj.Name) - return nil, fmt.Errorf(snapshotNotBound, options.PVC.Spec.DataSource.Name) + return nil, fmt.Errorf(snapshotNotBound, claim.Spec.DataSource.Name) } - if snapContentObj.Spec.Driver != options.StorageClass.Provisioner { - klog.Warningf("snapshotcontent %s for snapshot %s/%s is handled by a different CSI driver than requested by StorageClass %s", *snapshotObj.Status.BoundVolumeSnapshotContentName, snapshotObj.Namespace, snapshotObj.Name, options.StorageClass.Name) - return nil, fmt.Errorf(snapshotNotBound, options.PVC.Spec.DataSource.Name) + if snapContentObj.Spec.Driver != sc.Provisioner { + klog.Warningf("snapshotcontent %s for snapshot %s/%s is handled by a different CSI driver than requested by StorageClass %s", *snapshotObj.Status.BoundVolumeSnapshotContentName, snapshotObj.Namespace, snapshotObj.Name, sc.Name) + return nil, fmt.Errorf(snapshotNotBound, claim.Spec.DataSource.Name) } if snapshotObj.Status.ReadyToUse == nil || *snapshotObj.Status.ReadyToUse == false { - return nil, fmt.Errorf("snapshot %s is not Ready", options.PVC.Spec.DataSource.Name) + return nil, fmt.Errorf("snapshot %s is not Ready", claim.Spec.DataSource.Name) } klog.V(5).Infof("VolumeSnapshotContent %+v", snapContentObj) if snapContentObj.Status == nil || snapContentObj.Status.SnapshotHandle == nil { - return nil, fmt.Errorf("snapshot handle %s is not available", options.PVC.Spec.DataSource.Name) + return nil, fmt.Errorf("snapshot handle %s is not available", claim.Spec.DataSource.Name) } snapshotSource := csi.VolumeContentSource_Snapshot{ @@ -982,9 +1059,9 @@ func (p *csiProvisioner) getSnapshotSource(ctx context.Context, options controll klog.V(5).Infof("VolumeContentSource_Snapshot %+v", snapshotSource) if snapshotObj.Status.RestoreSize != nil { - capacity, exists := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] + capacity, exists := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] if !exists { - return nil, fmt.Errorf("error getting capacity for PVC %s when creating snapshot %s", options.PVC.Name, snapshotObj.Name) + return nil, fmt.Errorf("error getting capacity for PVC %s when creating snapshot %s", claim.Name, snapshotObj.Name) } volSizeBytes := capacity.Value() klog.V(5).Infof("Requested volume size is %d and snapshot size is %d for the source snapshot %s", int64(volSizeBytes), int64(snapshotObj.Status.RestoreSize.Value()), snapshotObj.Name) @@ -1026,6 +1103,21 @@ func (p *csiProvisioner) Delete(ctx context.Context, volume *v1.PersistentVolume return fmt.Errorf("invalid CSI PV") } + // If we run on a single node, then we shouldn't delete volumes + // that we didn't create. In practice, that means that the volume + // is accessible (only!) on this node. + if p.nodeDeployment != nil { + accessible, err := VolumeIsAccessible(volume.Spec.NodeAffinity, p.nodeDeployment.NodeInfo.AccessibleTopology) + if err != nil { + return fmt.Errorf("checking volume affinity failed: %v", err) + } + if !accessible { + return &controller.IgnoredError{ + Reason: "PV was not provisioned on this node", + } + } + } + volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle) rc := &requiredCapabilities{} @@ -1105,13 +1197,34 @@ func (p *csiProvisioner) SupportsBlock(ctx context.Context) bool { func (p *csiProvisioner) ShouldProvision(ctx context.Context, claim *v1.PersistentVolumeClaim) bool { provisioner := claim.Annotations[annStorageProvisioner] migratedTo := claim.Annotations[annMigratedTo] - if provisioner == p.driverName || migratedTo == p.driverName { - // Either CSI volume is requested or in-tree volume is migrated to CSI in PV controller - // and therefore PVC has CSI annotation. - return true + if provisioner != p.driverName && migratedTo != p.driverName { + // Non-migrated in-tree volume is requested. + return false + } + // Either CSI volume is requested or in-tree volume is migrated to CSI in PV controller + // and therefore PVC has CSI annotation. + // + // But before we start provisioning, check that we are (or can + // become) the owner if there are multiple provisioner instances. + // That we do this here is crucial because if we return false here, + // the claim will be ignored without logging an event for it. + // We don't want each provisioner instance to log events for the same + // claim unless they really need to do some work for it. + owned, err := p.checkNode(ctx, claim, nil, "should provision") + if err == nil { + if !owned { + return false + } + } else { + // This is unexpected. Here we can only log it and let + // a provisioning attempt start. If that still fails, + // a proper event will be created. + klog.V(2).Infof("trying to become an owner of PVC %s/%s in advance failed, will try again during provisioning: %s", + claim.Namespace, claim.Name, err) } - // Non-migrated in-tree volume is requested. - return false + + // Start provisioning. + return true } //TODO use a unique volume handle from and to Id @@ -1123,6 +1236,230 @@ func (p *csiProvisioner) volumeHandleToId(handle string) string { return handle } +// checkNode optionally checks whether the PVC is assigned to the current node. +// If the PVC uses immediate binding, it will try to take the PVC for provisioning +// on the current node. Returns true if provisioning can proceed, an error +// in case of a failure that prevented checking. +func (p *csiProvisioner) checkNode(ctx context.Context, claim *v1.PersistentVolumeClaim, sc *storagev1.StorageClass, caller string) (provision bool, err error) { + if p.nodeDeployment == nil { + return true, nil + } + + var selectedNode string + if claim.Annotations != nil { + selectedNode = claim.Annotations[annSelectedNode] + } + switch selectedNode { + case "": + logger := klog.V(5) + if logger.Enabled() { + logger.Infof("%s: checking node for PVC %s/%s with resource version %s", caller, claim.Namespace, claim.Name, claim.ResourceVersion) + defer func() { + logger.Infof("%s: done checking node for PVC %s/%s with resource version %s: provision %v, err %v", caller, claim.Namespace, claim.Name, claim.ResourceVersion, provision, err) + }() + } + + if sc == nil { + var err error + sc, err = p.scLister.Get(*claim.Spec.StorageClassName) + if err != nil { + return false, err + } + } + if sc.VolumeBindingMode == nil || + *sc.VolumeBindingMode != storagev1.VolumeBindingImmediate || + !p.nodeDeployment.ImmediateBinding { + return false, nil + } + + // Try to select the current node if there is a chance of it + // being created there, i.e. there is currently enough free space (checked in becomeOwner). + // + // If later volume provisioning fails on this node, the annotation will be unset and node + // selection will happen again. If no other node picks up the volume, then the PVC remains + // in the queue and this check will be repeated from time to time. + // + // A lot of different external-provisioner instances will try to do this at the same time. + // To avoid the thundering herd problem, we sleep in becomeOwner for a short random amount of time + // (for new PVCs) or exponentially increasing time (for PVCs were we already had a conflict). + if err := p.nodeDeployment.becomeOwner(ctx, p, claim); err != nil { + return false, fmt.Errorf("PVC %s/%s: %v", claim.Namespace, claim.Name, err) + } + + // We are now either the owner or someone else is. We'll check when the updated PVC + // enters the workqueue and gets processed by sig-storage-lib-external-provisioner. + return false, nil + case p.nodeDeployment.NodeName: + // Our node is selected. + return true, nil + default: + // Some other node is selected, ignore it. + return false, nil + } +} + +func (p *csiProvisioner) checkCapacity(ctx context.Context, claim *v1.PersistentVolumeClaim, selectedNodeName string) (bool, error) { + capacity := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] + volSizeBytes := capacity.Value() + if volSizeBytes == 0 { + // Nothing to check. + return true, nil + } + + if claim.Spec.StorageClassName == nil { + return false, errors.New("empty storage class name") + } + sc, err := p.scLister.Get(*claim.Spec.StorageClassName) + if err != nil { + return false, err + } + + node, err := p.nodeLister.Get(selectedNodeName) + if err != nil { + return false, err + } + + result, _, err := p.prepareProvision(ctx, claim, sc, node) + if err != nil { + return false, err + } + + // In practice, we expect exactly one entry here once a node + // has been selected. But we have to be prepared for more than + // one (=> check all, success if there is at least one) and + // none (no node selected => check once without topology). + topologies := []*csi.Topology{nil} + if result.req.AccessibilityRequirements != nil && len(result.req.AccessibilityRequirements.Requisite) > 0 { + topologies = result.req.AccessibilityRequirements.Requisite + } + for _, topology := range topologies { + req := &csi.GetCapacityRequest{ + VolumeCapabilities: result.req.VolumeCapabilities, + Parameters: result.req.Parameters, + AccessibleTopology: topology, + } + klog.V(5).Infof("GetCapacityRequest %+v", req) + resp, err := p.csiClient.GetCapacity(ctx, req) + if err != nil { + return false, fmt.Errorf("GetCapacity: %v", err) + } + if volSizeBytes <= resp.AvailableCapacity { + // Enough capacity at the moment. + return true, nil + } + } + + // Currently not enough capacity anywhere. + return false, nil +} + +// becomeOwner updates the PVC with the current node as selected node. +// Returns an error if something unexpectedly failed, otherwise an updated PVC with +// the current node selected or nil if not the owner. +func (nc *internalNodeDeployment) becomeOwner(ctx context.Context, p *csiProvisioner, claim *v1.PersistentVolumeClaim) error { + requeues := nc.rateLimiter.NumRequeues(claim.UID) + delay := nc.rateLimiter.When(claim.UID) + klog.V(5).Infof("will try to become owner of PVC %s/%s with resource version %s in %s (attempt #%d)", claim.Namespace, claim.Name, claim.ResourceVersion, delay, requeues) + sleep, cancel := context.WithTimeout(ctx, delay) + defer cancel() + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + check := func() (bool, *v1.PersistentVolumeClaim, error) { + current, err := nc.ClaimInformer.Lister().PersistentVolumeClaims(claim.Namespace).Get(claim.Name) + if err != nil { + return false, nil, fmt.Errorf("PVC not found: %v", err) + } + if claim.UID != current.UID { + return false, nil, errors.New("PVC was replaced") + } + if current.Annotations != nil && current.Annotations[annSelectedNode] != "" && current.Annotations[annSelectedNode] != nc.NodeName { + return true, current, nil + } + return false, current, nil + } + var stop bool + var current *v1.PersistentVolumeClaim + var err error +loop: + for { + select { + case <-ctx.Done(): + return errors.New("timed out waiting to become PVC owner") + case <-sleep.Done(): + stop, current, err = check() + break loop + case <-ticker.C: + // Abort the waiting early if we know that someone else is the owner. + stop, current, err = check() + if err != nil || stop { + break loop + } + } + } + if err != nil { + return err + } + if stop { + // Some other instance was faster and we don't need to provision for + // this PVC. If the PVC needs to be rescheduled, we start the delay from scratch. + nc.rateLimiter.Forget(claim.UID) + klog.V(5).Infof("did not become owner of PVC %s/%s with resource revision %s, now owned by %s with resource revision %s", + claim.Namespace, claim.Name, claim.ResourceVersion, + current.Annotations[annSelectedNode], current.ResourceVersion) + return nil + } + + // Check capacity as late as possible before trying to become the owner, because that is a + // relatively expensive operation. + // + // The exact same parameters are computed here as if we were provisioning. If a precondition + // is violated, like "storage class does not exist", then we have two options: + // - silently ignore the problem, but if all instances do that, the problem is not surfaced + // to the user + // - try to become the owner and let provisioning start, which then will probably + // fail the same way, but then has a chance to inform the user via events + // + // We do the latter. + hasCapacity, err := p.checkCapacity(ctx, claim, p.nodeDeployment.NodeName) + if err != nil { + klog.V(3).Infof("proceeding with becoming owner although the capacity check failed: %v", err) + } else if !hasCapacity { + // Don't try to provision. + klog.V(5).Infof("not enough capacity for PVC %s/%s with resource revision %s", claim.Namespace, claim.Name, claim.ResourceVersion) + return nil + } + + // Update PVC with our node as selected node if necessary. + current = current.DeepCopy() + if current.Annotations == nil { + current.Annotations = map[string]string{} + } + if current.Annotations[annSelectedNode] == nc.NodeName { + // A mere sanity check. Should not happen. + klog.V(5).Infof("already owner of PVC %s/%s with updated resource version %s", current.Namespace, current.Name, current.ResourceVersion) + return nil + } + current.Annotations[annSelectedNode] = nc.NodeName + klog.V(5).Infof("trying to become owner of PVC %s/%s with resource version %s now", current.Namespace, current.Name, current.ResourceVersion) + current, err = p.client.CoreV1().PersistentVolumeClaims(current.Namespace).Update(ctx, current, metav1.UpdateOptions{}) + if err != nil { + // Next attempt will use a longer delay and most likely + // stop quickly once we see who owns the PVC now. + if apierrors.IsConflict(err) { + // Lost the race or some other concurrent modification. Repeat the attempt. + klog.V(3).Infof("conflict during PVC %s/%s update, will try again", claim.Namespace, claim.Name) + return nc.becomeOwner(ctx, p, claim) + } + // Some unexpected error. Report it. + return fmt.Errorf("selecting node %q for PVC failed: %v", nc.NodeName, err) + } + + // Successfully became owner. Future delays will be smaller again. + nc.rateLimiter.Forget(claim.UID) + klog.V(5).Infof("became owner of PVC %s/%s with updated resource version %s", current.Namespace, current.Name, current.ResourceVersion) + return nil +} + // verifyAndGetSecretNameAndNamespaceTemplate gets the values (templates) associated // with the parameters specified in "secret" and verifies that they are specified correctly. func verifyAndGetSecretNameAndNamespaceTemplate(secret secretParamsMap, storageClassParams map[string]string) (nameTemplate, namespaceTemplate string, err error) { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index e656dc0b0d..dd2b338aee 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" @@ -61,8 +62,9 @@ func init() { } const ( - timeout = 10 * time.Second - driverName = "test-driver" + timeout = 10 * time.Second + driverName = "test-driver" + driverTopologyKey = "test-driver-node" ) var ( @@ -413,7 +415,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", - 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, false) + 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, nil) // Requested PVC with requestedBytes storage deletePolicy := v1.PersistentVolumeReclaimDelete @@ -485,6 +487,8 @@ func provisionFromPVCCapabilities() (rpc.PluginCapabilitySet, rpc.ControllerCapa } } +var fakeSCName = "fake-test-sc" + func createFakeNamedPVC(requestBytes int64, name string, userAnnotations map[string]string) *v1.PersistentVolumeClaim { annotations := map[string]string{annStorageProvisioner: driverName} for k, v := range userAnnotations { @@ -505,6 +509,7 @@ func createFakeNamedPVC(requestBytes int64, name string, userAnnotations map[str v1.ResourceName(v1.ResourceStorage): resource.MustParse(strconv.FormatInt(requestBytes, 10)), }, }, + StorageClassName: &fakeSCName, }, } } @@ -798,21 +803,24 @@ func TestGetSecretReference(t *testing.T) { } type provisioningTestcase struct { - volOpts controller.ProvisionOptions - notNilSelector bool - makeVolumeNameErr bool - getSecretRefErr bool - getCredentialsErr bool - volWithLessCap bool - volWithZeroCap bool - expectedPVSpec *pvSpec - clientSetObjects []runtime.Object - createVolumeError error - expectErr bool - expectState controller.ProvisioningState - expectCreateVolDo interface{} - withExtraMetadata bool - skipCreateVolume bool + volOpts controller.ProvisionOptions + notNilSelector bool + makeVolumeNameErr bool + getSecretRefErr bool + getCredentialsErr bool + volWithLessCap bool + volWithZeroCap bool + expectedPVSpec *pvSpec + clientSetObjects []runtime.Object + createVolumeError error + expectErr bool + expectState controller.ProvisioningState + expectCreateVolDo interface{} + withExtraMetadata bool + skipCreateVolume bool + deploymentNode string // fake distributed provisioning with this node as host + immediateBinding bool // enable immediate binding support for distributed provisioning + expectSelectedNode string // a specific selected-node of the PVC in the apiserver after the test, same as before if empty } type provisioningFSTypeTestcase struct { @@ -1004,14 +1012,27 @@ func TestFSTypeProvision(t *testing.T) { } for k, tc := range testcases { - runFSTypeProvisionTest(t, k, tc, requestedBytes, driverName, "" /* no migration */) + t.Run(k, func(t *testing.T) { + runFSTypeProvisionTest(t, k, tc, requestedBytes, driverName, "" /* no migration */) + }) } } func TestProvision(t *testing.T) { var requestedBytes int64 = 100 deletePolicy := v1.PersistentVolumeReclaimDelete + immediateBinding := storagev1.VolumeBindingImmediate apiGrp := "my.example.io" + nodeFoo := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + nodeBar := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + }, + } testcases := map[string]provisioningTestcase{ "normal provision": { volOpts: controller.ProvisionOptions{ @@ -1802,10 +1823,149 @@ func TestProvision(t *testing.T) { expectErr: true, skipCreateVolume: true, }, + "distributed, right node selected": { + deploymentNode: "foo", + volOpts: controller.ProvisionOptions{ + SelectedNode: nodeFoo, + StorageClass: &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: fakeSCName, + }, + ReclaimPolicy: &deletePolicy, + Parameters: map[string]string{ + "fstype": "ext3", + }, + }, + PVName: "test-name", + PVC: func() *v1.PersistentVolumeClaim { + claim := createFakePVC(requestedBytes) + claim.Annotations[annSelectedNode] = nodeFoo.Name + return claim + }(), + }, + expectedPVSpec: &pvSpec{ + Name: "test-testi", + ReclaimPolicy: v1.PersistentVolumeReclaimDelete, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(requestedBytes), + }, + CSIPVS: &v1.CSIPersistentVolumeSource{ + Driver: "test-driver", + VolumeHandle: "test-volume-id", + FSType: "ext3", + VolumeAttributes: map[string]string{ + "storage.kubernetes.io/csiProvisionerIdentity": "test-provisioner", + }, + }, + }, + expectState: controller.ProvisioningFinished, + }, + "distributed, no node selected": { + deploymentNode: "foo", + volOpts: controller.ProvisionOptions{ + StorageClass: &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: fakeSCName, + }, + ReclaimPolicy: &deletePolicy, + Parameters: map[string]string{ + "fstype": "ext3", + }, + }, + PVName: "test-name", + PVC: createFakePVC(requestedBytes), + }, + expectErr: true, + expectState: controller.ProvisioningNoChange, + }, + "distributed, wrong node selected": { + deploymentNode: "foo", + volOpts: controller.ProvisionOptions{ + SelectedNode: nodeBar, + StorageClass: &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: fakeSCName, + }, + ReclaimPolicy: &deletePolicy, + Parameters: map[string]string{ + "fstype": "ext3", + }, + }, + PVName: "test-name", + PVC: func() *v1.PersistentVolumeClaim { + claim := createFakePVC(requestedBytes) + claim.Annotations[annSelectedNode] = nodeBar.Name + return claim + }(), + }, + expectErr: true, + expectState: controller.ProvisioningNoChange, + }, + "distributed immediate, right node selected": { + deploymentNode: "foo", + immediateBinding: true, + volOpts: controller.ProvisionOptions{ + SelectedNode: nodeFoo, + StorageClass: &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: fakeSCName, + }, + ReclaimPolicy: &deletePolicy, + Parameters: map[string]string{ + "fstype": "ext3", + }, + }, + PVName: "test-name", + PVC: func() *v1.PersistentVolumeClaim { + claim := createFakePVC(requestedBytes) + claim.Annotations[annSelectedNode] = nodeFoo.Name + return claim + }(), + }, + expectedPVSpec: &pvSpec{ + Name: "test-testi", + ReclaimPolicy: v1.PersistentVolumeReclaimDelete, + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(requestedBytes), + }, + CSIPVS: &v1.CSIPersistentVolumeSource{ + Driver: "test-driver", + VolumeHandle: "test-volume-id", + FSType: "ext3", + VolumeAttributes: map[string]string{ + "storage.kubernetes.io/csiProvisionerIdentity": "test-provisioner", + }, + }, + }, + expectState: controller.ProvisioningFinished, + }, + "distributed immediate, no node selected": { + deploymentNode: "foo", + immediateBinding: true, + volOpts: controller.ProvisionOptions{ + StorageClass: &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: fakeSCName, + }, + ReclaimPolicy: &deletePolicy, + Parameters: map[string]string{ + "fstype": "ext3", + }, + VolumeBindingMode: &immediateBinding, + }, + PVName: "test-name", + PVC: createFakePVC(requestedBytes), + }, + expectErr: true, + expectState: controller.ProvisioningNoChange, + expectSelectedNode: nodeFoo.Name, + }, } for k, tc := range testcases { - runProvisionTest(t, k, tc, requestedBytes, driverName, "" /* no migration */) + t.Run(k, func(t *testing.T) { + runProvisionTest(t, k, tc, requestedBytes, driverName, "" /* no migration */) + }) } } @@ -1856,7 +2016,7 @@ func runFSTypeProvisionTest(t *testing.T, k string, tc provisioningFSTypeTestcas myDefaultfsType = "" } csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), nil, nil, nil, nil, nil, false, myDefaultfsType, false) + nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), nil, nil, nil, nil, nil, false, myDefaultfsType, nil) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ CapacityBytes: requestedBytes, @@ -1929,17 +2089,12 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested defer mockController.Finish() defer driver.Stop() - clientSet := fakeclientset.NewSimpleClientset(tc.clientSetObjects...) - pluginCaps, controllerCaps := provisionCapabilities() - csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), nil, nil, nil, nil, nil, tc.withExtraMetadata, defaultfsType, false) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ CapacityBytes: requestedBytes, VolumeId: "test-volume-id", }, } - if tc.notNilSelector { tc.volOpts.PVC.Spec.Selector = &metav1.LabelSelector{} } else if tc.makeVolumeNameErr { @@ -1965,6 +2120,36 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested } } + expectSelectedNode := tc.expectSelectedNode + objects := tc.clientSetObjects + if tc.volOpts.PVC != nil { + objects = append(objects, tc.volOpts.PVC) + if expectSelectedNode == "" && tc.volOpts.PVC.Annotations != nil { + expectSelectedNode = tc.volOpts.PVC.Annotations[annSelectedNode] + } + } + if tc.volOpts.StorageClass != nil { + objects = append(objects, tc.volOpts.StorageClass) + } + clientSet := fakeclientset.NewSimpleClientset(objects...) + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + claimInformer := informerFactory.Core().V1().PersistentVolumeClaims() + scInformer := informerFactory.Storage().V1().StorageClasses() + + var nodeDeployment *NodeDeployment + if tc.deploymentNode != "" { + nodeDeployment = &NodeDeployment{ + NodeName: tc.deploymentNode, + ClaimInformer: claimInformer, + ImmediateBinding: tc.immediateBinding, + } + } + + pluginCaps, controllerCaps := provisionCapabilities() + csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, + nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, true, csitrans.New(), scInformer.Lister(), nil, nil, nil, nil, tc.withExtraMetadata, defaultfsType, nodeDeployment) + + claimInformer.Informer().GetStore().Add(tc.volOpts.PVC) pv, state, err := csiProvisioner.Provision(context.Background(), tc.volOpts) if tc.expectErr && err == nil { t.Errorf("test %q: Expected error, got none", k) @@ -2010,7 +2195,17 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested t.Errorf("test %q: expected PV: %v, got: %v", k, tc.expectedPVSpec.CSIPVS, pv.Spec.PersistentVolumeSource.CSI) } } + } + if expectSelectedNode != "" { + claim, err := clientSet.CoreV1().PersistentVolumeClaims(tc.volOpts.PVC.Namespace).Get(context.Background(), tc.volOpts.PVC.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("PVC %s not found: %v", tc.volOpts.PVC.Name, err) + } else if claim.Annotations == nil { + t.Errorf("PVC %s has no annotations", claim.Name) + } else if claim.Annotations[annSelectedNode] != expectSelectedNode { + t.Errorf("expected selected node %q, got %q", expectSelectedNode, claim.Annotations[annSelectedNode]) + } } } @@ -2071,7 +2266,7 @@ func TestProvisionFromSnapshot(t *testing.T) { CSIPVS *v1.CSIPersistentVolumeSource } - testcases := map[string]struct { + type testcase struct { volOpts controller.ProvisionOptions restoredVolSizeSmall bool wrongDataSource bool @@ -2088,7 +2283,8 @@ func TestProvisionFromSnapshot(t *testing.T) { nilReadyToUse bool nilContentStatus bool nilSnapshotHandle bool - }{ + } + testcases := map[string]testcase{ "provision with volume snapshot data source": { volOpts: controller.ProvisionOptions{ StorageClass: &storagev1.StorageClass{ @@ -2626,7 +2822,7 @@ func TestProvisionFromSnapshot(t *testing.T) { defer mockController.Finish() defer driver.Stop() - for k, tc := range testcases { + doit := func(t *testing.T, tc testcase) { var clientSet kubernetes.Interface clientSet = fakeclientset.NewSimpleClientset() client := &fake.Clientset{} @@ -2667,7 +2863,7 @@ func TestProvisionFromSnapshot(t *testing.T) { pluginCaps, controllerCaps := provisionFromSnapshotCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - client, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, false) + client, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, nil) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -2704,31 +2900,37 @@ func TestProvisionFromSnapshot(t *testing.T) { pv, _, err := csiProvisioner.Provision(context.Background(), tc.volOpts) if tc.expectErr && err == nil { - t.Errorf("test %q: Expected error, got none", k) + t.Errorf("Expected error, got none") } if !tc.expectErr && err != nil { - t.Errorf("test %q: got error: %v", k, err) + t.Errorf("got error: %v", err) } if tc.expectedPVSpec != nil { if pv != nil { if pv.Name != tc.expectedPVSpec.Name { - t.Errorf("test %q: expected PV name: %q, got: %q", k, tc.expectedPVSpec.Name, pv.Name) + t.Errorf("expected PV name: %q, got: %q", tc.expectedPVSpec.Name, pv.Name) } if !reflect.DeepEqual(pv.Spec.Capacity, tc.expectedPVSpec.Capacity) { - t.Errorf("test %q: expected capacity: %v, got: %v", k, tc.expectedPVSpec.Capacity, pv.Spec.Capacity) + t.Errorf("expected capacity: %v, got: %v", tc.expectedPVSpec.Capacity, pv.Spec.Capacity) } if tc.expectedPVSpec.CSIPVS != nil { if !reflect.DeepEqual(pv.Spec.PersistentVolumeSource.CSI, tc.expectedPVSpec.CSIPVS) { - t.Errorf("test %q: expected PV: %v, got: %v", k, tc.expectedPVSpec.CSIPVS, pv.Spec.PersistentVolumeSource.CSI) + t.Errorf("expected PV: %v, got: %v", tc.expectedPVSpec.CSIPVS, pv.Spec.PersistentVolumeSource.CSI) } } } } } + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + doit(t, tc) + }) + } } // TestProvisionWithTopology is a basic test of provisioner integration with topology functions. @@ -2841,7 +3043,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) { defer close(stopChan) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType, false) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType, nil) pv, _, err := csiProvisioner.Provision(context.Background(), controller.ProvisionOptions{ StorageClass: &storagev1.StorageClass{}, @@ -2935,7 +3137,7 @@ func TestProvisionErrorHandling(t *testing.T) { defer close(stopChan) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType, false) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, csiNodeLister, nodeLister, claimLister, vaLister, false, defaultfsType, nil) options := controller.ProvisionOptions{ StorageClass: &storagev1.StorageClass{}, @@ -3008,7 +3210,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) { clientSet := fakeclientset.NewSimpleClientset() pluginCaps, controllerCaps := provisionWithTopologyCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, false) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, nil, nil, false, defaultfsType, nil) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ @@ -3044,6 +3246,7 @@ type deleteTestcase struct { storageClass *storagev1.StorageClass volumeAttachment *storagev1.VolumeAttachment mockDelete bool + deploymentNode string // fake distributed provisioning with this node as host expectErr bool } @@ -3310,10 +3513,101 @@ func TestDelete(t *testing.T) { expectErr: false, mockDelete: true, }, + "distributed, ignore PV from other host": { + deploymentNode: "foo", + persistentVolume: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + VolumeHandle: "vol-id-1", + }, + }, + ClaimRef: &v1.ObjectReference{ + Name: "pvc-name", + }, + NodeAffinity: &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: driverTopologyKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{ + "bar", + }, + }, + }, + }, + }, + }, + }, + }, + }, + storageClass: &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sc-name", + }, + Parameters: map[string]string{ + prefixedProvisionerSecretNameKey: "static-${pv.name}-${pvc.namespace}-${pvc.name}", + }, + }, + expectErr: true, + }, + "distributed, delete PV from our host": { + deploymentNode: "foo", + persistentVolume: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + CSI: &v1.CSIPersistentVolumeSource{ + VolumeHandle: "vol-id-1", + }, + }, + ClaimRef: &v1.ObjectReference{ + Name: "pvc-name", + }, + NodeAffinity: &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: driverTopologyKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{ + "foo", + }, + }, + }, + }, + }, + }, + }, + }, + }, + storageClass: &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sc-name", + }, + Parameters: map[string]string{ + prefixedProvisionerSecretNameKey: "static-${pv.name}-${pvc.namespace}-${pvc.name}", + }, + }, + expectErr: false, + mockDelete: true, + }, } for k, tc := range tt { - runDeleteTest(t, k, tc) + t.Run(k, func(t *testing.T) { + runDeleteTest(t, k, tc) + }) } } @@ -3338,6 +3632,25 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) { clientSet = fakeclientset.NewSimpleClientset() } + informerFactory := informers.NewSharedInformerFactory(clientSet, 0) + claimInformer := informerFactory.Core().V1().PersistentVolumeClaims() + + var nodeDeployment *NodeDeployment + if tc.deploymentNode != "" { + nodeDeployment = &NodeDeployment{ + NodeName: tc.deploymentNode, + ClaimInformer: claimInformer, + NodeInfo: csi.NodeGetInfoResponse{ + NodeId: tc.deploymentNode, + AccessibleTopology: &csi.Topology{ + Segments: map[string]string{ + driverTopologyKey: tc.deploymentNode, + }, + }, + }, + } + } + if tc.mockDelete { controllerServer.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&csi.DeleteVolumeResponse{}, nil).Times(1) } @@ -3345,7 +3658,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) { pluginCaps, controllerCaps := provisionCapabilities() scLister, _, _, _, vaLister, _ := listers(clientSet) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, - csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, nil, nil, nil, vaLister, false, defaultfsType, false) + csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), scLister, nil, nil, nil, vaLister, false, defaultfsType, nodeDeployment) err = csiProvisioner.Delete(context.Background(), tc.persistentVolume) if tc.expectErr && err == nil { @@ -3766,7 +4079,7 @@ func TestProvisionFromPVC(t *testing.T) { // Phase: execute the test csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, - nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, claimLister, nil, false, defaultfsType, false) + nil, driverName, pluginCaps, controllerCaps, "", false, true, csitrans.New(), nil, nil, nil, claimLister, nil, false, defaultfsType, nil) pv, _, err = csiProvisioner.Provision(context.Background(), tc.volOpts) if tc.expectErr && err == nil { @@ -3884,7 +4197,7 @@ func TestProvisionWithMigration(t *testing.T) { pluginCaps, controllerCaps := provisionCapabilities() csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, - inTreePluginName, false, true, mockTranslator, nil, nil, nil, nil, nil, false, defaultfsType, false) + inTreePluginName, false, true, mockTranslator, nil, nil, nil, nil, nil, false, defaultfsType, nil) // Set up return values (AnyTimes to avoid overfitting on implementation) @@ -4046,7 +4359,7 @@ func TestDeleteMigration(t *testing.T) { defer close(stopCh) csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", - false, true, mockTranslator, nil, nil, nil, nil, vaLister, false, defaultfsType, false) + false, true, mockTranslator, nil, nil, nil, nil, vaLister, false, defaultfsType, nil) // Set mock return values (AnyTimes to avoid overfitting on implementation details) mockTranslator.EXPECT().IsPVMigratable(gomock.Any()).Return(tc.expectTranslation).AnyTimes() diff --git a/pkg/controller/ratelimiter.go b/pkg/controller/ratelimiter.go new file mode 100644 index 0000000000..e139019d42 --- /dev/null +++ b/pkg/controller/ratelimiter.go @@ -0,0 +1,53 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "math/rand" + "sync" + "time" + + "k8s.io/client-go/util/workqueue" +) + +type rateLimiterWithJitter struct { + workqueue.RateLimiter + baseDelay time.Duration + rd *rand.Rand + mutex sync.Mutex +} + +func (r *rateLimiterWithJitter) When(item interface{}) time.Duration { + r.mutex.Lock() + defer r.mutex.Unlock() + + delay := r.RateLimiter.When(item).Nanoseconds() + percentage := r.rd.Float64() + jitter := int64(float64(r.baseDelay.Nanoseconds()) * percentage) + if jitter > delay { + return 0 + } + return time.Duration(delay - jitter) +} + +func newItemExponentialFailureRateLimiterWithJitter(baseDelay time.Duration, maxDelay time.Duration) workqueue.RateLimiter { + return &rateLimiterWithJitter{ + RateLimiter: workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay), + baseDelay: baseDelay, + rd: rand.New(rand.NewSource(time.Now().UTC().UnixNano())), + } +} diff --git a/pkg/controller/ratelimiter_test.go b/pkg/controller/ratelimiter_test.go new file mode 100644 index 0000000000..514f748d9c --- /dev/null +++ b/pkg/controller/ratelimiter_test.go @@ -0,0 +1,37 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + "time" +) + +const factorMaxDelay = 10 + +func TestRateLimiter(t *testing.T) { + maxDelay := factorMaxDelay * time.Second + rd := newItemExponentialFailureRateLimiterWithJitter(time.Second, maxDelay) + + for i := 0; i < 100; i++ { + backoff := rd.When(1) + if backoff > maxDelay || backoff < 0 { + t.Errorf("expected value > 0, < %s, got %s", maxDelay, backoff) + } + rd.Forget(1) + } +} diff --git a/pkg/controller/topology.go b/pkg/controller/topology.go index 0285dc2366..2ec56e451c 100644 --- a/pkg/controller/topology.go +++ b/pkg/controller/topology.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" storagelistersv1 "k8s.io/client-go/listers/storage/v1" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" ) @@ -72,6 +73,26 @@ func GenerateVolumeNodeAffinity(accessibleTopology []*csi.Topology) *v1.VolumeNo } } +// VolumeIsAccessible checks whether the generated volume affinity is satisfied by +// a the node topology that a CSI driver reported in GetNodeInfoResponse. +func VolumeIsAccessible(affinity *v1.VolumeNodeAffinity, nodeTopology *csi.Topology) (bool, error) { + if nodeTopology == nil || affinity == nil || affinity.Required == nil { + // No topology information -> all volumes accessible. + return true, nil + } + + nodeLabels := labels.Set{} + for k, v := range nodeTopology.Segments { + nodeLabels[k] = v + } + node := v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: nodeLabels, + }, + } + return corev1helpers.MatchNodeSelectorTerms(&node, affinity.Required) +} + // SupportsTopology returns whether topology is supported both for plugin and external provisioner func SupportsTopology(pluginCapabilities rpc.PluginCapabilitySet) bool { return pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] && diff --git a/vendor/k8s.io/component-helpers/LICENSE b/vendor/k8s.io/component-helpers/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/vendor/k8s.io/component-helpers/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/k8s.io/component-helpers/scheduling/corev1/doc.go b/vendor/k8s.io/component-helpers/scheduling/corev1/doc.go new file mode 100644 index 0000000000..c6cf8132af --- /dev/null +++ b/vendor/k8s.io/component-helpers/scheduling/corev1/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package corev1 defines functions which should satisfy one of the following: +// +// - Be used by more than one core component (kube-scheduler, kubelet, kube-apiserver, etc.) +// - Be used by a core component and another kubernetes project (cluster-autoscaler, descheduler) +// +// And be a scheduling feature. +package corev1 // import "k8s.io/component-helpers/scheduling/corev1" diff --git a/vendor/k8s.io/component-helpers/scheduling/corev1/helpers.go b/vendor/k8s.io/component-helpers/scheduling/corev1/helpers.go new file mode 100644 index 0000000000..57f916c265 --- /dev/null +++ b/vendor/k8s.io/component-helpers/scheduling/corev1/helpers.go @@ -0,0 +1,45 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package corev1 + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" +) + +// PodPriority returns priority of the given pod. +func PodPriority(pod *v1.Pod) int32 { + if pod.Spec.Priority != nil { + return *pod.Spec.Priority + } + // When priority of a running pod is nil, it means it was created at a time + // that there was no global default priority class and the priority class + // name of the pod was empty. So, we resolve to the static default priority. + return 0 +} + +// MatchNodeSelectorTerms checks whether the node labels and fields match node selector terms in ORed; +// nil or empty term matches no objects. +func MatchNodeSelectorTerms( + node *v1.Node, + nodeSelector *v1.NodeSelector, +) (bool, error) { + if node == nil { + return false, nil + } + return nodeaffinity.NewLazyErrorNodeSelector(nodeSelector).Match(node) +} diff --git a/vendor/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity.go b/vendor/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity.go new file mode 100644 index 0000000000..efca431e61 --- /dev/null +++ b/vendor/k8s.io/component-helpers/scheduling/corev1/nodeaffinity/nodeaffinity.go @@ -0,0 +1,262 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeaffinity + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/errors" +) + +// NodeSelector is a runtime representation of v1.NodeSelector. +type NodeSelector struct { + lazy LazyErrorNodeSelector +} + +// LazyErrorNodeSelector is a runtime representation of v1.NodeSelector that +// only reports parse errors when no terms match. +type LazyErrorNodeSelector struct { + terms []nodeSelectorTerm +} + +// NewNodeSelector returns a NodeSelector or all parsing errors found. +func NewNodeSelector(ns *v1.NodeSelector) (*NodeSelector, error) { + lazy := NewLazyErrorNodeSelector(ns) + var errs []error + for _, term := range lazy.terms { + if term.parseErr != nil { + errs = append(errs, term.parseErr) + } + } + if len(errs) != 0 { + return nil, errors.NewAggregate(errs) + } + return &NodeSelector{lazy: *lazy}, nil +} + +// NewLazyErrorNodeSelector creates a NodeSelector that only reports parse +// errors when no terms match. +func NewLazyErrorNodeSelector(ns *v1.NodeSelector) *LazyErrorNodeSelector { + parsedTerms := make([]nodeSelectorTerm, 0, len(ns.NodeSelectorTerms)) + for _, term := range ns.NodeSelectorTerms { + // nil or empty term selects no objects + if isEmptyNodeSelectorTerm(&term) { + continue + } + parsedTerms = append(parsedTerms, newNodeSelectorTerm(&term)) + } + return &LazyErrorNodeSelector{ + terms: parsedTerms, + } +} + +// Match checks whether the node labels and fields match the selector terms, ORed; +// nil or empty term matches no objects. +func (ns *NodeSelector) Match(node *v1.Node) bool { + // parse errors are reported in NewNodeSelector. + match, _ := ns.lazy.Match(node) + return match +} + +// Match checks whether the node labels and fields match the selector terms, ORed; +// nil or empty term matches no objects. +// Parse errors are only returned if no terms matched. +func (ns *LazyErrorNodeSelector) Match(node *v1.Node) (bool, error) { + if node == nil { + return false, nil + } + nodeLabels := labels.Set(node.Labels) + nodeFields := extractNodeFields(node) + + var errs []error + for _, term := range ns.terms { + match, err := term.match(nodeLabels, nodeFields) + if err != nil { + errs = append(errs, term.parseErr) + continue + } + if match { + return true, nil + } + } + return false, errors.NewAggregate(errs) +} + +// PreferredSchedulingTerms is a runtime representation of []v1.PreferredSchedulingTerms. +type PreferredSchedulingTerms struct { + terms []preferredSchedulingTerm +} + +// NewPreferredSchedulingTerms returns a PreferredSchedulingTerms or all the parsing errors found. +// If a v1.PreferredSchedulingTerm has a 0 weight, its parsing is skipped. +func NewPreferredSchedulingTerms(terms []v1.PreferredSchedulingTerm) (*PreferredSchedulingTerms, error) { + var errs []error + parsedTerms := make([]preferredSchedulingTerm, 0, len(terms)) + for _, term := range terms { + if term.Weight == 0 || isEmptyNodeSelectorTerm(&term.Preference) { + continue + } + parsedTerm := preferredSchedulingTerm{ + nodeSelectorTerm: newNodeSelectorTerm(&term.Preference), + weight: int(term.Weight), + } + if parsedTerm.parseErr != nil { + errs = append(errs, parsedTerm.parseErr) + } else { + parsedTerms = append(parsedTerms, parsedTerm) + } + } + if len(errs) != 0 { + return nil, errors.NewAggregate(errs) + } + return &PreferredSchedulingTerms{terms: parsedTerms}, nil +} + +// Score returns a score for a Node: the sum of the weights of the terms that +// match the Node. +func (t *PreferredSchedulingTerms) Score(node *v1.Node) int64 { + var score int64 + nodeLabels := labels.Set(node.Labels) + nodeFields := extractNodeFields(node) + for _, term := range t.terms { + // parse errors are reported in NewPreferredSchedulingTerms. + if ok, _ := term.match(nodeLabels, nodeFields); ok { + score += int64(term.weight) + } + } + return score +} + +func isEmptyNodeSelectorTerm(term *v1.NodeSelectorTerm) bool { + return len(term.MatchExpressions) == 0 && len(term.MatchFields) == 0 +} + +func extractNodeFields(n *v1.Node) fields.Set { + f := make(fields.Set) + if len(n.Name) > 0 { + f["metadata.name"] = n.Name + } + return f +} + +type nodeSelectorTerm struct { + matchLabels labels.Selector + matchFields fields.Selector + parseErr error +} + +func newNodeSelectorTerm(term *v1.NodeSelectorTerm) nodeSelectorTerm { + var parsedTerm nodeSelectorTerm + if len(term.MatchExpressions) != 0 { + parsedTerm.matchLabels, parsedTerm.parseErr = nodeSelectorRequirementsAsSelector(term.MatchExpressions) + if parsedTerm.parseErr != nil { + return parsedTerm + } + } + if len(term.MatchFields) != 0 { + parsedTerm.matchFields, parsedTerm.parseErr = nodeSelectorRequirementsAsFieldSelector(term.MatchFields) + } + return parsedTerm +} + +func (t *nodeSelectorTerm) match(nodeLabels labels.Set, nodeFields fields.Set) (bool, error) { + if t.parseErr != nil { + return false, t.parseErr + } + if t.matchLabels != nil && !t.matchLabels.Matches(nodeLabels) { + return false, nil + } + if t.matchFields != nil && len(nodeFields) > 0 && !t.matchFields.Matches(nodeFields) { + return false, nil + } + return true, nil +} + +// nodeSelectorRequirementsAsSelector converts the []NodeSelectorRequirement api type into a struct that implements +// labels.Selector. +func nodeSelectorRequirementsAsSelector(nsm []v1.NodeSelectorRequirement) (labels.Selector, error) { + if len(nsm) == 0 { + return labels.Nothing(), nil + } + selector := labels.NewSelector() + for _, expr := range nsm { + var op selection.Operator + switch expr.Operator { + case v1.NodeSelectorOpIn: + op = selection.In + case v1.NodeSelectorOpNotIn: + op = selection.NotIn + case v1.NodeSelectorOpExists: + op = selection.Exists + case v1.NodeSelectorOpDoesNotExist: + op = selection.DoesNotExist + case v1.NodeSelectorOpGt: + op = selection.GreaterThan + case v1.NodeSelectorOpLt: + op = selection.LessThan + default: + return nil, fmt.Errorf("%q is not a valid node selector operator", expr.Operator) + } + r, err := labels.NewRequirement(expr.Key, op, expr.Values) + if err != nil { + return nil, err + } + selector = selector.Add(*r) + } + return selector, nil +} + +// nodeSelectorRequirementsAsFieldSelector converts the []NodeSelectorRequirement core type into a struct that implements +// fields.Selector. +func nodeSelectorRequirementsAsFieldSelector(nsr []v1.NodeSelectorRequirement) (fields.Selector, error) { + if len(nsr) == 0 { + return fields.Nothing(), nil + } + + var selectors []fields.Selector + for _, expr := range nsr { + switch expr.Operator { + case v1.NodeSelectorOpIn: + if len(expr.Values) != 1 { + return nil, fmt.Errorf("unexpected number of value (%d) for node field selector operator %q", + len(expr.Values), expr.Operator) + } + selectors = append(selectors, fields.OneTermEqualSelector(expr.Key, expr.Values[0])) + + case v1.NodeSelectorOpNotIn: + if len(expr.Values) != 1 { + return nil, fmt.Errorf("unexpected number of value (%d) for node field selector operator %q", + len(expr.Values), expr.Operator) + } + selectors = append(selectors, fields.OneTermNotEqualSelector(expr.Key, expr.Values[0])) + + default: + return nil, fmt.Errorf("%q is not a valid node field selector operator", expr.Operator) + } + } + + return fields.AndSelectors(selectors...), nil +} + +type preferredSchedulingTerm struct { + nodeSelectorTerm + weight int +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d81bfa2449..8be32f5316 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -571,6 +571,10 @@ k8s.io/component-base/featuregate k8s.io/component-base/featuregate/testing k8s.io/component-base/metrics k8s.io/component-base/version +# k8s.io/component-helpers v0.20.0 => k8s.io/component-helpers v0.20.0 +## explicit +k8s.io/component-helpers/scheduling/corev1 +k8s.io/component-helpers/scheduling/corev1/nodeaffinity # k8s.io/csi-translation-lib v0.20.0 => k8s.io/csi-translation-lib v0.20.0 ## explicit k8s.io/csi-translation-lib From 53c168e3a9d2df9e674a9f43eec04a4f85831334 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 2 Dec 2020 15:21:31 +0100 Subject: [PATCH 3/7] add --local-topology When deploying external-provisioner on each node, the topology information that it needs is most likely just the values reported by the local CSI driver instance. We can avoid the extra work for watching Node and CSINode in that case. --- README.md | 13 +++--- cmd/csi-provisioner/csi-provisioner.go | 59 +++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 97286f1525..6d8be828ef 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,8 @@ See the [storage capacity section](#capacity-support) below for details. * `--node-deployment-max-delay`: Determines how long the external-provisioner sleeps at most before trying to own a PVC with immediate binding. Defaults to 60 seconds. +* `--local-topology`: Instead of watching Node and CSINode objects, use only the topology provided by the CSI driver. Only valid in combination with `--node-deployment`. Disabled by default, but recommended for drivers which have a single topology key with different values for each node (i.e. local volumes). + #### Other recognized arguments * `--feature-gates `: A set of comma separated `=` pairs that describe feature gates for alpha/experimental features. See [list of features](#feature-status) or `--help` output for list of recognized features. Example: `--feature-gates Topology=true` to enable Topology feature that's disabled by default. @@ -271,6 +273,8 @@ each CSI driver on different nodes. The CSI driver deployment must: to match the expected cluster size and desired response times (only relevant when there are storage classes with immediate binding, see below for details) +- use `--local-topology` if volumes are only accessible inside the node + where they get provisioned - set the `NODE_NAME` environment variable to the name of the Kubernetes node - implement `GetCapacity` @@ -303,11 +307,10 @@ can own it. The `--node-deployment-base-delay` parameter determines the initial wait period. It also sets the jitter, so in practice the initial wait period will be -in the range from zero to the base delay. After a collision, the delay -increases exponentially. If the value is high, volumes with immediate -binding get created more slowly. If it is low, then the risk of -conflicts while setting the "selected node" annotation increases and -the apiserver load will be higher. +in the range from zero to the base delay. If the value is high, +volumes with immediate binding get created more slowly. If it is low, +then the risk of conflicts while setting the "selected node" +annotation increases and the apiserver load will be higher. There is an exponential backoff per PVC which is used for unexpected problems. Normally, an owner for a PVC is chosen during the first diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 7c21e3196b..24c93ec201 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -29,11 +29,14 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" flag "github.com/spf13/pflag" + v1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - v1 "k8s.io/client-go/listers/core/v1" + listersv1 "k8s.io/client-go/listers/core/v1" storagelistersv1 "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -94,6 +97,7 @@ var ( nodeDeploymentImmediateBinding = flag.Bool("node-deployment-immediate-binding", true, "Determines whether immediate binding is supported when deployed on each node.") nodeDeploymentBaseDelay = flag.Duration("node-deployment-base-delay", 20*time.Second, "Determines how long the external-provisioner sleeps initially before trying to own a PVC with immediate binding.") nodeDeploymentMaxDelay = flag.Duration("node-deployment-max-delay", 60*time.Second, "Determines how long the external-provisioner sleeps at most before trying to own a PVC with immediate binding.") + localTopology = flag.Bool("local-topology", false, "Instead of watching Node and CSINode objects, use only the topology provided by the CSI driver. Only valid in combination with --node-deployment.") featureGates map[string]bool provisionController *controller.ProvisionController @@ -261,14 +265,55 @@ func main() { nodeDeployment.NodeInfo = *nodeInfo } - var nodeLister v1.NodeLister + var nodeLister listersv1.NodeLister var csiNodeLister storagelistersv1.CSINodeLister if ctrl.SupportsTopology(pluginCapabilities) { - // TODO (?): when deployed on each node with --strict-topology=true, then the topology - // code only needs the static information about the local node. We can avoid the overhead - // of watching the actual objects by providing just that information. - csiNodeLister = factory.Storage().V1().CSINodes().Lister() - nodeLister = factory.Core().V1().Nodes().Lister() + if *localTopology { + if nodeDeployment == nil { + klog.Fatal("--local-topology is only valid in combination with --node-deployment") + } + // Avoid watching in favor of fake, static objects. This is particularly relevant for + // Node objects, which can generate significant traffic. + csiNode := &storagev1.CSINode{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeDeployment.NodeName, + }, + Spec: storagev1.CSINodeSpec{ + Drivers: []storagev1.CSINodeDriver{ + { + Name: provisionerName, + NodeID: nodeDeployment.NodeInfo.NodeId, + }, + }, + }, + } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeDeployment.NodeName, + }, + } + if nodeDeployment.NodeInfo.AccessibleTopology != nil { + for key := range nodeDeployment.NodeInfo.AccessibleTopology.Segments { + csiNode.Spec.Drivers[0].TopologyKeys = append(csiNode.Spec.Drivers[0].TopologyKeys, key) + } + node.Labels = nodeDeployment.NodeInfo.AccessibleTopology.Segments + } + klog.Infof("using local topology with Node = %+v and CSINode = %+v", node, csiNode) + + // We make those fake objects available to the topology code via informers which + // never change. + stoppedFactory := informers.NewSharedInformerFactory(clientset, 1000*time.Hour) + csiNodes := stoppedFactory.Storage().V1().CSINodes() + nodes := stoppedFactory.Core().V1().Nodes() + csiNodes.Informer().GetStore().Add(csiNode) + nodes.Informer().GetStore().Add(node) + csiNodeLister = csiNodes.Lister() + nodeLister = nodes.Lister() + + } else { + csiNodeLister = factory.Storage().V1().CSINodes().Lister() + nodeLister = factory.Core().V1().Nodes().Lister() + } } // ------------------------------- From 2b3ac29e9c34f368148f67cebf95831a32d48252 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 3 Dec 2020 12:18:46 +0100 Subject: [PATCH 4/7] docs: document handling of local volumes on missing nodes This is intentionally a separate section because although it applies to distributed provisioning, the same problem also arises when a CSI driver handles provisioning of local volumes differently. --- README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/README.md b/README.md index 6d8be828ef..7b42e2c0ca 100644 --- a/README.md +++ b/README.md @@ -342,6 +342,27 @@ instead implement a custom policy in a separate controller which sets the "selected node" annotation to trigger local provisioning on the desired node. +### Deleting local volumes after a node failure or removal + +When a node with local volumes gets removed from a cluster before +deleting those volumes, the PV and PVC objects may still exist. It may +be possible to remove the PVC normally if the volume was not in use by +any pod on the node, but normal deletion of the volume and thus +deletion of the PV is not possible anymore because the CSI driver +instance on the node is not available or reachable anymore and therefore +Kubernetes cannot be sure that it is okay to remove the PV. + +When an administrator is sure that the node is never going to come +back, then the local volumes can be removed manually: +- force-delete objects: `kubectl delete pv --wait=false --grace-period=0 --force` +- remove all finalizers: `kubectl patch pv -p '{"metadata":{"finalizers":null}}'` + +If there still was a PVC which was bound to that PV, it then will be +moved to phase "Lost". It has to be deleted and re-created if still +needed because no new volume will be created for it. Editing the PVC +to revert it to phase "Unbound" is not allowed by the Kubernetes +API server. + ## Community, discussion, contribution, and support Learn how to engage with the Kubernetes community on the [community page](http://kubernetes.io/community/). From b9301ee82a99b810fb291a394bff3fad1a591bf9 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 9 Dec 2020 17:46:20 +0100 Subject: [PATCH 5/7] add --capacity-controller-deployment-mode=local Producing CSIStorageCapacity objects for a node uses the same code, the only difference is that there is just a single topology segment that the external-provisioner needs to iterate over. Also, that segment is fixed. Therefore we can use the simple mock informer that previously was only used for testing. --- README.md | 16 ++- cmd/csi-provisioner/csi-provisioner.go | 32 +++-- pkg/capacity/capacity_test.go | 160 ++++++------------------- pkg/capacity/mode.go | 6 +- pkg/capacity/topology/mock.go | 89 ++++++++++++++ 5 files changed, 162 insertions(+), 141 deletions(-) create mode 100644 pkg/capacity/topology/mock.go diff --git a/README.md b/README.md index 7b42e2c0ca..3deb53b2b4 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ Note that the external-provisioner does not scale with more replicas. Only one e See the [storage capacity section](#capacity-support) below for details. -* `--capacity-controller-deployment-mode=central`: Setting this enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call. 'central' is currently the only supported mode. Use it when there is just one active provisioner in the cluster. The default is to not produce CSIStorageCapacity objects. +* `--capacity-controller-deployment-mode=central|local`: Setting this enables producing CSIStorageCapacity objects with capacity information from the driver's GetCapacity call. Use `central` when there is just one active external-provisioner in the cluster. Use `local` when deploying external-provisioner on each node with distributed provisioning. The default is to not produce CSIStorageCapacity objects. * `--capacity-ownerref-level `: The level indicates the number of objects that need to be traversed starting from the pod identified by the POD_NAME and POD_NAMESPACE environment variables to reach the owning object for CSIStorageCapacity objects: 0 for the pod itself, 1 for a StatefulSet, 2 for a Deployment, etc. Defaults to `1` (= StatefulSet). @@ -151,7 +151,7 @@ determine with the `POD_NAME/POD_NAMESPACE` environment variables and the `--capacity-ownerref-level` parameter. Other solutions will be added in the future. -To enable this feature in a driver deployment (see also the +To enable this feature in a driver deployment with a central controller (see also the [`deploy/kubernetes/storage-capacity.yaml`](deploy/kubernetes/storage-capacity.yaml) example): @@ -167,7 +167,7 @@ example): fieldRef: fieldPath: metadata.name ``` -- Add `--enable-capacity=central` to the command line flags. +- Add `--capacity-controller-deployment-mode=central` to the command line flags. - Add `StorageCapacity: true` to the CSIDriver information object. Without it, external-provisioner will publish information, but the Kubernetes scheduler will ignore it. This can be used to first @@ -182,7 +182,7 @@ example): with `--capacity-threads`. - Optional: enable producing information also for storage classes that use immediate volume binding with - `--enable-capacity=immediate-binding`. This is usually not needed + `--capacity-for-immediate-binding`. This is usually not needed because such volumes are created by the driver without involving the Kubernetes scheduler and thus the published information would just be ignored. @@ -232,6 +232,14 @@ CSIStorageCapacity objects, so in theory a malfunctioning or malicious driver deployment could also publish incorrect information about some other driver. +The deployment with [distributed +provisioning](#distributed-provisioning) is almost the same as above, +with some minor changes: +- Add `--capacity-controller-deployment-mode=local` to the command line flags. +- Use `--capacity-ownerref-level=0` and the `POD_NAMESPACE/POD_NAME` + variables to make the pod that contains the external-provisioner + the owner of CSIStorageCapacity objects for the node. + ### CSI error and timeout handling The external-provisioner invokes all gRPC calls to CSI driver with timeout provided by `--timeout` command line argument (15 seconds by default). diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 24c93ec201..3975fc8062 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -390,7 +390,8 @@ func main() { ) var capacityController *capacity.Controller - if *capacityMode == capacity.DeploymentModeCentral { + if *capacityMode == capacity.DeploymentModeCentral || + *capacityMode == capacity.DeploymentModeLocal { podName := os.Getenv("POD_NAME") namespace := os.Getenv("POD_NAMESPACE") if podName == "" || namespace == "" { @@ -407,13 +408,28 @@ func main() { } klog.Infof("using %s/%s %s as owner of CSIStorageCapacity objects", controller.APIVersion, controller.Kind, controller.Name) - topologyInformer := topology.NewNodeTopology( - provisionerName, - clientset, - factory.Core().V1().Nodes(), - factory.Storage().V1().CSINodes(), - workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"), - ) + var topologyInformer topology.Informer + if *capacityMode == capacity.DeploymentModeCentral { + topologyInformer = topology.NewNodeTopology( + provisionerName, + clientset, + factory.Core().V1().Nodes(), + factory.Storage().V1().CSINodes(), + workqueue.NewNamedRateLimitingQueue(rateLimiter, "csitopology"), + ) + } else { + var segment topology.Segment + if nodeDeployment == nil { + klog.Fatal("--capacity-controller-deployment-mode=local is only valid in combination with --node-deployment") + } + if nodeDeployment.NodeInfo.AccessibleTopology != nil { + for key, value := range nodeDeployment.NodeInfo.AccessibleTopology.Segments { + segment = append(segment, topology.SegmentEntry{Key: key, Value: value}) + } + } + klog.Infof("producing CSIStorageCapacity objects with fixed topology segment %s", segment) + topologyInformer = topology.NewFixedNodeTopology(&segment) + } // We only need objects from our own namespace. The normal factory would give // us an informer for the entire cluster. diff --git a/pkg/capacity/capacity_test.go b/pkg/capacity/capacity_test.go index 95cec81fae..3c02d874db 100644 --- a/pkg/capacity/capacity_test.go +++ b/pkg/capacity/capacity_test.go @@ -88,22 +88,20 @@ var ( func TestController(t *testing.T) { testcases := map[string]struct { immediateBinding bool - topology mockTopology + topology *topology.Mock storage mockCapacity initialSCs []testSC initialCapacities []testCapacity expectedCapacities []testCapacity modify func(ctx context.Context, clientSet *fakeclientset.Clientset, expected []testCapacity) (modifiedExpected []testCapacity, err error) capacityChange func(ctx context.Context, storage *mockCapacity, expected []testCapacity) (modifiedExpected []testCapacity) - topologyChange func(ctx context.Context, topology *mockTopology, expected []testCapacity) (modifiedExpected []testCapacity) + topologyChange func(ctx context.Context, topology *topology.Mock, expected []testCapacity) (modifiedExpected []testCapacity) }{ "empty": { expectedCapacities: []testCapacity{}, }, "one segment": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), expectedCapacities: []testCapacity{}, }, "one class": { @@ -116,9 +114,7 @@ func TestController(t *testing.T) { expectedCapacities: []testCapacity{}, }, "one capacity object": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -140,9 +136,7 @@ func TestController(t *testing.T) { }, }, "ignore SC with immediate binding": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -159,9 +153,7 @@ func TestController(t *testing.T) { }, "support SC with immediate binding": { immediateBinding: true, - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -184,9 +176,7 @@ func TestController(t *testing.T) { }, }, "reuse one capacity object, no changes": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -218,9 +208,7 @@ func TestController(t *testing.T) { }, }, "reuse one capacity object, update capacity": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -252,9 +240,7 @@ func TestController(t *testing.T) { }, }, "obsolete object, missing SC": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -332,12 +318,7 @@ func TestController(t *testing.T) { }, }, "two segments, two classes, four objects missing": { - topology: mockTopology{ - segments: []*topology.Segment{ - &layer0, - &layer0other, - }, - }, + topology: topology.NewMock(&layer0, &layer0other), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -386,12 +367,7 @@ func TestController(t *testing.T) { }, }, "two segments, two classes, four objects updated": { - topology: mockTopology{ - segments: []*topology.Segment{ - &layer0, - &layer0other, - }, - }, + topology: topology.NewMock(&layer0, &layer0other), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -470,12 +446,7 @@ func TestController(t *testing.T) { }, }, "two segments, two classes, two added, two removed": { - topology: mockTopology{ - segments: []*topology.Segment{ - &layer0, - &layer0other, - }, - }, + topology: topology.NewMock(&layer0, &layer0other), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -552,9 +523,7 @@ func TestController(t *testing.T) { }, }, "fix modified capacity": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -591,9 +560,7 @@ func TestController(t *testing.T) { }, }, "re-create capacity": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -638,9 +605,7 @@ func TestController(t *testing.T) { }, }, "ignore capacity after owner change": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -688,9 +653,7 @@ func TestController(t *testing.T) { }, }, "delete and recreate by someone": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -734,9 +697,7 @@ func TestController(t *testing.T) { }, }, "storage capacity change": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -766,7 +727,6 @@ func TestController(t *testing.T) { }, }, "add storage topology segment": { - topology: mockTopology{}, storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -790,8 +750,8 @@ func TestController(t *testing.T) { }, }, expectedCapacities: nil, - topologyChange: func(ctx context.Context, topo *mockTopology, expected []testCapacity) []testCapacity { - topo.modify([]*topology.Segment{&layer0} /* added */, nil /* removed */) + topologyChange: func(ctx context.Context, topo *topology.Mock, expected []testCapacity) []testCapacity { + topo.Modify([]*topology.Segment{&layer0} /* added */, nil /* removed */) return append(expected, testCapacity{ uid: "CSISC-UID-1", resourceVersion: csiscRev + "0", @@ -803,7 +763,6 @@ func TestController(t *testing.T) { }, "add storage topology segment, immediate binding": { immediateBinding: true, - topology: mockTopology{}, storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -822,8 +781,8 @@ func TestController(t *testing.T) { }, }, expectedCapacities: nil, - topologyChange: func(ctx context.Context, topo *mockTopology, expected []testCapacity) []testCapacity { - topo.modify([]*topology.Segment{&layer0} /* added */, nil /* removed */) + topologyChange: func(ctx context.Context, topo *topology.Mock, expected []testCapacity) []testCapacity { + topo.Modify([]*topology.Segment{&layer0} /* added */, nil /* removed */) // We don't check the UID here because we don't want to fail when // ordering of storage classes isn't such that the "immediate-sc" is seen first. return append(expected, testCapacity{ @@ -842,9 +801,7 @@ func TestController(t *testing.T) { }, }, "remove storage topology segment": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -871,15 +828,13 @@ func TestController(t *testing.T) { quantity: "1Gi", }, }, - topologyChange: func(ctx context.Context, topo *mockTopology, expected []testCapacity) []testCapacity { - topo.modify(nil /* added */, topo.segments[:] /* removed */) + topologyChange: func(ctx context.Context, topo *topology.Mock, expected []testCapacity) []testCapacity { + topo.Modify(nil /* added */, topo.List()[:] /* removed */) return nil }, }, "add and remove storage topology segment": { - topology: mockTopology{ - segments: []*topology.Segment{&layer0}, - }, + topology: topology.NewMock(&layer0), storage: mockCapacity{ capacity: map[string]interface{}{ // This matches layer0. @@ -907,9 +862,9 @@ func TestController(t *testing.T) { quantity: "1Gi", }, }, - topologyChange: func(ctx context.Context, topo *mockTopology, expected []testCapacity) []testCapacity { - topo.modify([]*topology.Segment{&layer0other}, /* added */ - topo.segments[:] /* removed */) + topologyChange: func(ctx context.Context, topo *topology.Mock, expected []testCapacity) []testCapacity { + topo.Modify([]*topology.Segment{&layer0other}, /* added */ + topo.List()[:] /* removed */) return []testCapacity{ { uid: "CSISC-UID-2", @@ -937,7 +892,11 @@ func TestController(t *testing.T) { clientSet := fakeclientset.NewSimpleClientset(objects...) clientSet.PrependReactor("create", "csistoragecapacities", createCSIStorageCapacityReactor()) clientSet.PrependReactor("update", "csistoragecapacities", updateCSIStorageCapacityReactor()) - c := fakeController(ctx, clientSet, &tc.storage, &tc.topology, tc.immediateBinding) + topo := tc.topology + if topo == nil { + topo = topology.NewMock() + } + c := fakeController(ctx, clientSet, &tc.storage, topo, tc.immediateBinding) for _, testCapacity := range tc.initialCapacities { capacity := makeCapacity(testCapacity) _, err := clientSet.StorageV1alpha1().CSIStorageCapacities(ownerNamespace).Create(ctx, capacity, metav1.CreateOptions{}) @@ -979,7 +938,7 @@ func TestController(t *testing.T) { } if tc.topologyChange != nil { klog.Info("modifying topology") - expectedCapacities = tc.topologyChange(ctx, &tc.topology, expectedCapacities) + expectedCapacities = tc.topologyChange(ctx, topo, expectedCapacities) if err := validateCapacitiesEventually(ctx, c, clientSet, expectedCapacities); err != nil { t.Fatalf("modified capacity: %v", err) } @@ -1242,57 +1201,6 @@ func getCapacity(capacity map[string]interface{}, segments map[string]string, la return "", nil } -// mockTopology simulates a driver installation on different nodes. -type mockTopology struct { - segments []*topology.Segment - callbacks []topology.Callback -} - -func (mt *mockTopology) AddCallback(cb topology.Callback) { - mt.callbacks = append(mt.callbacks, cb) - cb(mt.segments, nil) -} - -func (mt *mockTopology) List() []*topology.Segment { - return mt.segments -} - -func (mt *mockTopology) Run(ctx context.Context) { -} - -func (mt *mockTopology) HasSynced() bool { - return true -} - -func (mt *mockTopology) modify(add, remove []*topology.Segment) { - var added, removed []*topology.Segment - for _, segment := range add { - if mt.segmentIndex(segment) == -1 { - added = append(added, segment) - mt.segments = append(mt.segments, segment) - } - } - for _, segment := range remove { - index := mt.segmentIndex(segment) - if index != -1 { - removed = append(removed, segment) - mt.segments = append(mt.segments[0:index], mt.segments[index+1:]...) - } - } - for _, cb := range mt.callbacks { - cb(added, removed) - } -} - -func (mt *mockTopology) segmentIndex(segment *topology.Segment) int { - for i, otherSegment := range mt.segments { - if otherSegment == segment { - return i - } - } - return -1 -} - type testCapacity struct { uid types.UID resourceVersion string diff --git a/pkg/capacity/mode.go b/pkg/capacity/mode.go index 11d0f11fe9..9817a8c04c 100644 --- a/pkg/capacity/mode.go +++ b/pkg/capacity/mode.go @@ -33,7 +33,7 @@ const ( DeploymentModeCentral = DeploymentMode("central") // DeploymentModeLocal enables the mode where external-provisioner - // is deployed on each node. Not implemented yet. + // is deployed on each node. DeploymentModeLocal = DeploymentMode("local") // DeploymentModeUnset disables the capacity feature completely. @@ -44,7 +44,7 @@ const ( // with optional whitespace. func (mode *DeploymentMode) Set(value string) error { switch DeploymentMode(value) { - case DeploymentModeCentral, DeploymentModeUnset: + case DeploymentModeCentral, DeploymentModeLocal, DeploymentModeUnset: *mode = DeploymentMode(value) default: return errors.New("invalid value") @@ -57,7 +57,7 @@ func (mode *DeploymentMode) String() string { } func (mode *DeploymentMode) Type() string { - return strings.Join([]string{string(DeploymentModeCentral) /*, string(DeploymentModeLocal) */}, "|") + return strings.Join([]string{string(DeploymentModeCentral), string(DeploymentModeLocal)}, "|") } var _ flag.Value = new(DeploymentMode) diff --git a/pkg/capacity/topology/mock.go b/pkg/capacity/topology/mock.go new file mode 100644 index 0000000000..90084dec89 --- /dev/null +++ b/pkg/capacity/topology/mock.go @@ -0,0 +1,89 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topology + +import ( + "context" +) + +// NewFixedNodeTopology creates topology informer for +// a driver with a fixed topology segment. +func NewFixedNodeTopology(segment *Segment) Informer { + return NewMock(segment) +} + +// NewMock creates a new mocked topology informer with a +// certain set of pre-defined segments. +func NewMock(segments ...*Segment) *Mock { + return &Mock{ + segments: segments, + } +} + +var _ Informer = &Mock{} + +// Mock simulates a driver installation on one or more nodes. +type Mock struct { + segments []*Segment + callbacks []Callback +} + +func (mt *Mock) AddCallback(cb Callback) { + mt.callbacks = append(mt.callbacks, cb) + cb(mt.segments, nil) +} + +func (mt *Mock) List() []*Segment { + return mt.segments +} + +func (mt *Mock) Run(ctx context.Context) { +} + +func (mt *Mock) HasSynced() bool { + return true +} + +// Modify adds and/or removes segments. +func (mt *Mock) Modify(add, remove []*Segment) { + var added, removed []*Segment + for _, segment := range add { + if mt.segmentIndex(segment) == -1 { + added = append(added, segment) + mt.segments = append(mt.segments, segment) + } + } + for _, segment := range remove { + index := mt.segmentIndex(segment) + if index != -1 { + removed = append(removed, segment) + mt.segments = append(mt.segments[0:index], mt.segments[index+1:]...) + } + } + for _, cb := range mt.callbacks { + cb(added, removed) + } +} + +func (mt *Mock) segmentIndex(segment *Segment) int { + for i, otherSegment := range mt.segments { + if otherSegment == segment { + return i + } + } + return -1 +} From 28257ef39c54269028682e4ebdbc6b4fae5ec20b Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 15 Dec 2020 21:06:14 +0100 Subject: [PATCH 6/7] docs, polling delay: review feedback --- README.md | 10 ++++++++-- pkg/controller/controller.go | 30 ++++++++++++++++++++++++------ 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 3deb53b2b4..99afdece00 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ See the [storage capacity section](#capacity-support) below for details. * `--node-deployment`: Enables deploying the external-provisioner together with a CSI driver on nodes to manage node-local volumes. Off by default. -* `--node-deployment-immediate-binding`: Determines whether immediate binding is supported when deployed on each node. Enabled by default, use `--node-deployment-immediate-binding=false` if not desired. +* `--node-deployment-immediate-binding`: Determines whether immediate binding is supported when deployed on each node. Enabled by default, use `--node-deployment-immediate-binding=false` if not desired. Disabling it may be useful for example when a custom controller will select nodes for PVCs. * `--node-deployment-base-delay`: Determines how long the external-provisioner sleeps initially before trying to own a PVC with immediate binding. Defaults to 20 seconds. @@ -335,7 +335,10 @@ volume was probably higher, but that wasn't measured. Note that the QPS settings of kube-controller-manager and external-provisioner have to be increased at the moment (Kubernetes -1.19) to provision volumes faster than around 4 volumes/second. +1.19) to provision volumes faster than around 4 volumes/second. Those +settings will eventually get replaced with [flow control in the API +server +itself](https://kubernetes.io/docs/concepts/cluster-administration/flow-control/). Beware that if *no* node has sufficient storage available, then also no `CreateVolume` call is attempted and thus no events are generated @@ -365,6 +368,9 @@ back, then the local volumes can be removed manually: - force-delete objects: `kubectl delete pv --wait=false --grace-period=0 --force` - remove all finalizers: `kubectl patch pv -p '{"metadata":{"finalizers":null}}'` +It may also be necessary to scrub disks before reusing them because +the CSI driver had no chance to do that. + If there still was a PVC which was bound to that PV, it then will be moved to phase "Lost". It has to be deleted and re-created if still needed because no new volume will be created for it. Editing the PVC diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 1e6e475fb7..f97abe54a5 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -209,12 +209,22 @@ type requiredCapabilities struct { // NodeDeployment contains additional parameters for running external-provisioner alongside a // CSI driver on one or more nodes in the cluster. type NodeDeployment struct { - NodeName string - ClaimInformer coreinformers.PersistentVolumeClaimInformer - NodeInfo csi.NodeGetInfoResponse + // NodeName is the name of the node in Kubernetes on which the external-provisioner runs. + NodeName string + // ClaimInformer is needed to detect when some other external-provisioner + // became the owner of a PVC while the local one is still waiting before + // trying to become the owner itself. + ClaimInformer coreinformers.PersistentVolumeClaimInformer + // NodeInfo is the result of NodeGetInfo. It is need to determine which + // PVs were created for the node. + NodeInfo csi.NodeGetInfoResponse + // ImmediateBinding enables support for PVCs with immediate binding. ImmediateBinding bool - BaseDelay time.Duration - MaxDelay time.Duration + // BaseDelay is the initial time that the external-provisioner waits + // before trying to become the owner of a PVC with immediate binding. + BaseDelay time.Duration + // MaxDelay is the maximum for the initial wait time. + MaxDelay time.Duration } type internalNodeDeployment struct { @@ -1362,7 +1372,15 @@ func (nc *internalNodeDeployment) becomeOwner(ctx context.Context, p *csiProvisi klog.V(5).Infof("will try to become owner of PVC %s/%s with resource version %s in %s (attempt #%d)", claim.Namespace, claim.Name, claim.ResourceVersion, delay, requeues) sleep, cancel := context.WithTimeout(ctx, delay) defer cancel() - ticker := time.NewTicker(10 * time.Millisecond) + // When the base delay is high we also should check less often. + // With multiple provisioners running in parallel, it becomes more + // likely that one of them became the owner quickly, so we don't + // want to check too slowly either. + pollInterval := nc.BaseDelay / 100 + if pollInterval < 10*time.Millisecond { + pollInterval = 10 * time.Millisecond + } + ticker := time.NewTicker(pollInterval) defer ticker.Stop() check := func() (bool, *v1.PersistentVolumeClaim, error) { current, err := nc.ClaimInformer.Lister().PersistentVolumeClaims(claim.Namespace).Get(claim.Name) From d71138a387aafe92b4ad3700338168ec45860263 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 15 Dec 2020 22:10:07 +0100 Subject: [PATCH 7/7] remove --local-topology It is uncertain whether that option is needed. Removing it simplifies the code. --- README.md | 10 ++++++---- cmd/csi-provisioner/csi-provisioner.go | 6 +----- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 99afdece00..6759fa150e 100644 --- a/README.md +++ b/README.md @@ -94,8 +94,6 @@ See the [storage capacity section](#capacity-support) below for details. * `--node-deployment-max-delay`: Determines how long the external-provisioner sleeps at most before trying to own a PVC with immediate binding. Defaults to 60 seconds. -* `--local-topology`: Instead of watching Node and CSINode objects, use only the topology provided by the CSI driver. Only valid in combination with `--node-deployment`. Disabled by default, but recommended for drivers which have a single topology key with different values for each node (i.e. local volumes). - #### Other recognized arguments * `--feature-gates `: A set of comma separated `=` pairs that describe feature gates for alpha/experimental features. See [list of features](#feature-status) or `--help` output for list of recognized features. Example: `--feature-gates Topology=true` to enable Topology feature that's disabled by default. @@ -281,13 +279,17 @@ each CSI driver on different nodes. The CSI driver deployment must: to match the expected cluster size and desired response times (only relevant when there are storage classes with immediate binding, see below for details) -- use `--local-topology` if volumes are only accessible inside the node - where they get provisioned - set the `NODE_NAME` environment variable to the name of the Kubernetes node - implement `GetCapacity` Usage of `--strict-topology` and `--immediate-topology=false` is recommended because it makes the `CreateVolume` invocations simpler. +Topology information is always derived exclusively from the +information returned by the CSI driver that runs on the same node, +without combining that with information stored for other nodes. This +works as long as each node is in its own topology segment, +i.e. usually with a single topology key and one unique value for each +node. Volume provisioning with late binding works as before, except that each external-provisioner instance checks the "selected node" diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 3975fc8062..4c33b43e8d 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -97,7 +97,6 @@ var ( nodeDeploymentImmediateBinding = flag.Bool("node-deployment-immediate-binding", true, "Determines whether immediate binding is supported when deployed on each node.") nodeDeploymentBaseDelay = flag.Duration("node-deployment-base-delay", 20*time.Second, "Determines how long the external-provisioner sleeps initially before trying to own a PVC with immediate binding.") nodeDeploymentMaxDelay = flag.Duration("node-deployment-max-delay", 60*time.Second, "Determines how long the external-provisioner sleeps at most before trying to own a PVC with immediate binding.") - localTopology = flag.Bool("local-topology", false, "Instead of watching Node and CSINode objects, use only the topology provided by the CSI driver. Only valid in combination with --node-deployment.") featureGates map[string]bool provisionController *controller.ProvisionController @@ -268,10 +267,7 @@ func main() { var nodeLister listersv1.NodeLister var csiNodeLister storagelistersv1.CSINodeLister if ctrl.SupportsTopology(pluginCapabilities) { - if *localTopology { - if nodeDeployment == nil { - klog.Fatal("--local-topology is only valid in combination with --node-deployment") - } + if nodeDeployment != nil { // Avoid watching in favor of fake, static objects. This is particularly relevant for // Node objects, which can generate significant traffic. csiNode := &storagev1.CSINode{