Skip to content

Commit

Permalink
Cache driver capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
jsafrane committed Mar 5, 2019
1 parent 1f293ff commit 7798ce6
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 400 deletions.
19 changes: 16 additions & 3 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import (
"strings"
"time"

"k8s.io/klog"

flag "github.com/spf13/pflag"

"github.com/container-storage-interface/spec/lib/go/csi"
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
"github.com/kubernetes-sigs/sig-storage-lib-external-provisioner/controller"
Expand All @@ -39,6 +38,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflag "k8s.io/apiserver/pkg/util/flag"
Expand Down Expand Up @@ -149,13 +149,26 @@ func init() {
}
klog.V(2).Infof("Detected CSI driver %s", provisionerName)

pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
if err != nil {
klog.Fatalf("Error getting CSI driver capabilities: %s", err)
}

if !pluginCapabilities[csi.PluginCapability_Service_CONTROLLER_SERVICE] {
klog.Fatalf("CSI driver does not support dynamic provisioning: plugin CONTROLLER_SERVICE capability is not reported")
}

if !controllerCapabilities[csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME] {
klog.Fatalf("CSI driver does not support dynamic provisioning: controller CREATE_DELETE_VOLUME capability is not reported")
}

// Generate a unique ID for this provisioner
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
161 changes: 52 additions & 109 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,26 +144,21 @@ var (

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
}

const (
PluginCapability_CONTROLLER_SERVICE = iota
PluginCapability_ACCESSIBILITY_CONSTRAINTS
ControllerCapability_CREATE_DELETE_VOLUME
ControllerCapability_CREATE_DELETE_SNAPSHOT
)

var _ controller.Provisioner = &csiProvisioner{}
var _ controller.BlockProvisioner = &csiProvisioner{}

Expand Down Expand Up @@ -198,47 +193,23 @@ func GetDriverName(conn *grpc.ClientConn, timeout time.Duration) (string, error)
return connection.GetDriverName(ctx, conn)
}

func getDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (sets.Int, error) {
pluginCaps, err := getPluginCapabilities(conn, timeout)
func GetDriverCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.PluginCapabilitySet, connection.ControllerCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
pluginCapabilities, err := connection.GetPluginCapabilities(ctx, conn)
if err != nil {
return nil, err
return nil, nil, err
}

controllerCaps, err := getControllerCapabilities(conn, timeout)
/* Each CSI operation gets its own timeout / context */
ctx, cancel = context.WithTimeout(context.Background(), timeout)
defer cancel()
controllerCapabilities, err := connection.GetControllerCapabilities(ctx, conn)
if err != nil {
return nil, err
return nil, nil, err
}

capabilities := make(sets.Int)
for cap := range pluginCaps {
switch cap {
case csi.PluginCapability_Service_CONTROLLER_SERVICE:
capabilities.Insert(PluginCapability_CONTROLLER_SERVICE)
case csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS:
capabilities.Insert(PluginCapability_ACCESSIBILITY_CONSTRAINTS)
}
}
for cap := range controllerCaps {
switch cap {
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME:
capabilities.Insert(ControllerCapability_CREATE_DELETE_VOLUME)
case csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT:
capabilities.Insert(ControllerCapability_CREATE_DELETE_SNAPSHOT)
}
}
return capabilities, nil
}

func getPluginCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.PluginCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return connection.GetPluginCapabilities(ctx, conn)
}

func getControllerCapabilities(conn *grpc.ClientConn, timeout time.Duration) (connection.ControllerCapabilitySet, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return connection.GetControllerCapabilities(ctx, conn)
return pluginCapabilities, controllerCapabilities, nil
}

// NewCSIProvisioner creates new CSI provisioner
Expand All @@ -250,55 +221,28 @@ func NewCSIProvisioner(client kubernetes.Interface,
volumeNameUUIDLength int,
grpcClient *grpc.ClientConn,
snapshotClient snapclientset.Interface,
driverName string) controller.Provisioner {
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
}
return provisioner
}

// This function get called before any attempt to communicate with the driver.
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
func checkDriverCapabilities(grpcClient *grpc.ClientConn, timeout time.Duration, needSnapshotSupport bool) (sets.Int, error) {
capabilities, err := getDriverCapabilities(grpcClient, timeout)
if err != nil {
return nil, fmt.Errorf("failed to get capabilities: %v", err)
}

if !capabilities.Has(PluginCapability_CONTROLLER_SERVICE) {
return nil, fmt.Errorf("no plugin controller service support detected")
}

if !capabilities.Has(ControllerCapability_CREATE_DELETE_VOLUME) {
return nil, fmt.Errorf("no create/delete volume support detected")
}

// If PVC.Spec.DataSource is not nil, it indicates the request is to create volume
// from snapshot and therefore we should check for snapshot support;
// otherwise we don't need to check for snapshot support.
if needSnapshotSupport {
// Check whether plugin supports create snapshot
// If not, create volume from snapshot cannot proceed
if !capabilities.Has(ControllerCapability_CREATE_DELETE_SNAPSHOT) {
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
}
}

return capabilities, nil
}

func makeVolumeName(prefix, pvcUID string, volumeNameUUIDLength int) (string, error) {
// create persistent name based on a volumeNamePrefix and volumeNameUUIDLength
// of PVC's UID
Expand Down Expand Up @@ -386,12 +330,13 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup {
return nil, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup))
}

// Snapshot support is requested, check it
if !p.controllerCapabilities[csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT] {
return nil, fmt.Errorf("no create/delete snapshot support detected. Cannot create volume from snapshot")
}
needSnapshotSupport = true
}
capabilities, err := checkDriverCapabilities(p.grpcClient, p.timeout, needSnapshotSupport)
if err != nil {
return nil, err
}

pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength)
if err != nil {
Expand Down Expand Up @@ -443,8 +388,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
req.VolumeContentSource = volumeContentSource
}

if capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology) {
if p.supportsTopology() {
requirements, err := GenerateAccessibilityRequirements(
p.client,
p.csiAPIClient,
Expand Down Expand Up @@ -549,8 +493,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
},
}

if capabilities.Has(PluginCapability_ACCESSIBILITY_CONSTRAINTS) &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology) {
if p.supportsTopology() {
pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology)
}

Expand All @@ -568,6 +511,11 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
return pv, nil
}

func (p *csiProvisioner) supportsTopology() bool {
return p.pluginCapabilities[csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS] &&
utilfeature.DefaultFeatureGate.Enabled(features.Topology)
}

func removePrefixedParameters(param map[string]string) (map[string]string, error) {
newParam := map[string]string{}
for k, v := range param {
Expand Down Expand Up @@ -656,11 +604,6 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
}
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

_, err := checkDriverCapabilities(p.grpcClient, p.timeout, false)
if err != nil {
return err
}

req := csi.DeleteVolumeRequest{
VolumeId: volumeId,
}
Expand All @@ -685,7 +628,7 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

_, err = p.csiClient.DeleteVolume(ctx, &req)
_, err := p.csiClient.DeleteVolume(ctx, &req)

return err
}
Expand Down
Loading

0 comments on commit 7798ce6

Please sign in to comment.