Skip to content

Commit

Permalink
Merge pull request #253 from ddebroy/csimigprov1
Browse files Browse the repository at this point in the history
Add dynamic provisioning support for CSI migration scenarios
  • Loading branch information
k8s-ci-robot authored Mar 25, 2019
2 parents 7f7d828 + ddfb2f5 commit 41036d7
Show file tree
Hide file tree
Showing 12 changed files with 1,200 additions and 45 deletions.
18 changes: 17 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 21 additions & 6 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflag "k8s.io/apiserver/pkg/util/flag"
csitranslationlib "k8s.io/csi-translation-lib"
)

var (
Expand Down Expand Up @@ -159,19 +160,33 @@ func init() {
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName

provisionerOptions := []func(*controller.ProvisionController) error{
controller.LeaderElection(*enableLeaderElection),
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.Threadiness(int(*workerThreads)),
}

supportsMigrationFromInTreePluginName := ""
if csitranslationlib.IsMigratedCSIDriverByName(provisionerName) {
supportsMigrationFromInTreePluginName, err = csitranslationlib.GetInTreeNameFromCSIName(provisionerName)
if err != nil {
klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)
}
klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
}

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
csiProvisioner,
serverVersion.GitVersion,
controller.LeaderElection(*enableLeaderElection),
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.Threadiness(int(*workerThreads)),
provisionerOptions...,
)
}

Expand Down
104 changes: 71 additions & 33 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
csitranslationlib "k8s.io/csi-translation-lib"
"k8s.io/klog"

"google.golang.org/grpc"
Expand Down Expand Up @@ -146,19 +147,20 @@ var (

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
supportsMigrationFromInTreePluginName string
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -214,22 +216,24 @@ func NewCSIProvisioner(client kubernetes.Interface,
snapshotClient snapclientset.Interface,
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet) controller.Provisioner {
controllerCapabilities connection.ControllerCapabilitySet,
supportsMigrationFromInTreePluginName string) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
}
return provisioner
}
Expand Down Expand Up @@ -328,6 +332,31 @@ func getVolumeCapability(
}

func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
createVolumeRequestParameters := options.Parameters
migratedVolume := false
if p.supportsMigrationFromInTreePluginName != "" {
storageClassName := options.PVC.Spec.StorageClassName
// TODO(https://github.com/kubernetes-csi/external-provisioner/issues/256): use informers
// NOTE: we cannot depend on PVC.Annotations[volume.beta.kubernetes.io/storage-provisioner] to get
// the in-tree provisioner name in case of CSI migration scenarios. The annotation will be
// set to the CSI provisioner name by PV controller for migration scenarios
// so that external provisioner can correctly pick up the PVC pointing to an in-tree plugin
storageClass, err := p.client.StorageV1().StorageClasses().Get(*storageClassName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get storage class named %s: %v", *storageClassName, err)
}
if storageClass.Provisioner == p.supportsMigrationFromInTreePluginName {
klog.V(2).Infof("translating storage class parameters for in-tree plugin %s to CSI", storageClass.Provisioner)
createVolumeRequestParameters, err = csitranslationlib.TranslateInTreeStorageClassParametersToCSI(p.supportsMigrationFromInTreePluginName, options.Parameters)
if err != nil {
return nil, fmt.Errorf("failed to translate storage class parameters: %v", err)
}
migratedVolume = true
} else {
klog.V(4).Infof("skip translation of storage class parameters for plugin: %s", storageClass.Provisioner)
}
}

var needSnapshotSupport bool
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
Expand Down Expand Up @@ -358,7 +387,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis

fsTypesFound := 0
fsType := ""
for k, v := range options.Parameters {
for k, v := range createVolumeRequestParameters {
if strings.ToLower(k) == "fstype" || k == prefixedFsTypeKey {
fsType = v
fsTypesFound++
Expand Down Expand Up @@ -386,7 +415,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
// Create a CSI CreateVolumeRequest and Response
req := csi.CreateVolumeRequest{
Name: pvName,
Parameters: options.Parameters,
Parameters: createVolumeRequestParameters,
VolumeCapabilities: volumeCaps,
CapacityRange: &csi.CapacityRange{
RequiredBytes: int64(volSizeBytes),
Expand Down Expand Up @@ -421,7 +450,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis

// Resolve provision secret credentials.
// No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.Parameters, pvName, nil)
provisionerSecretRef, err := getSecretReference(provisionerSecretParams, createVolumeRequestParameters, pvName, nil)
if err != nil {
return nil, err
}
Expand All @@ -432,20 +461,20 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
req.Secrets = provisionerCredentials

// Resolve controller publish, node stage, node publish secret references
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.Parameters, pvName, options.PVC)
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}
nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.Parameters, pvName, options.PVC)
nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}
nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.Parameters, pvName, options.PVC)
nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}

req.Parameters, err = removePrefixedParameters(options.Parameters)
req.Parameters, err = removePrefixedParameters(createVolumeRequestParameters)
if err != nil {
return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err)
}
Expand Down Expand Up @@ -519,6 +548,15 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
pv.Spec.PersistentVolumeSource.CSI.FSType = fsType
}

if migratedVolume {
pv, err = csitranslationlib.TranslateCSIPVToInTree(pv)
if err != nil {
klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err)
p.Delete(pv)
return nil, err
}
}

klog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)

return pv, nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer driver.Stop()

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

// Requested PVC with requestedBytes storage
opts := controller.VolumeOptions{
Expand Down Expand Up @@ -1166,7 +1166,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1516,7 +1516,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
})

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1609,7 +1609,7 @@ func TestProvisionWithTopology(t *testing.T) {
clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestProvisionWithMountOptions(t *testing.T) {
clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps)
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down
Loading

0 comments on commit 41036d7

Please sign in to comment.