diff --git a/cmd/csi-provisioner/csi-provisioner.go b/cmd/csi-provisioner/csi-provisioner.go index 8af4304916..69cca15a45 100644 --- a/cmd/csi-provisioner/csi-provisioner.go +++ b/cmd/csi-provisioner/csi-provisioner.go @@ -31,10 +31,19 @@ import ( "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/api/core/v1" + + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + utilversion "k8s.io/kubernetes/pkg/util/version" ) var ( @@ -47,8 +56,18 @@ var ( volumeNameUUIDLength = flag.Int("volume-name-uuid-length", 16, "Length in characters for the generated uuid of a created volume") showVersion = flag.Bool("version", false, "Show version.") + version = "unknown" + Namespace = "default" + + informerFactory informers.SharedInformerFactory + resourceLock resourcelock.Interface provisionController *controller.ProvisionController - version = "unknown" +) + +const ( + resyncPeriod = 1 * time.Millisecond + sharedResyncPeriod = 1 * time.Second + defaultServerVersion = "v1.6.0" ) func init() { @@ -110,17 +129,102 @@ func init() { } time.Sleep(10 * time.Second) } + + informerFactory = informers.NewSharedInformerFactory(clientset, resyncPeriod) + claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer() + volumeInformer := informerFactory.Core().V1().PersistentVolumes().Informer() + classInformer := func() cache.SharedIndexInformer { + if utilversion.MustParseSemantic(serverVersion.GitVersion).AtLeast(utilversion.MustParseSemantic(defaultServerVersion)) { + return informerFactory.Storage().V1().StorageClasses().Informer() + } + return informerFactory.Storage().V1beta1().StorageClasses().Informer() + }() + secretInformer := informerFactory.Core().V1().Secrets().Informer() + // Create the provisioner: it implements the Provisioner interface expected by // the controller - csiProvisioner := ctrl.NewCSIProvisioner(clientset, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient) + csiProvisioner := ctrl.NewCSIProvisioner( + clientset, + *csiEndpoint, + *connectionTimeout, + identity, + *volumeNamePrefix, + *volumeNameUUIDLength, + grpcClient, + secretInformer.GetStore(), + classInformer.GetStore(), + ) provisionController = controller.NewProvisionController( clientset, *provisioner, csiProvisioner, serverVersion.GitVersion, + controller.ClaimsInformer(claimInformer), + controller.VolumesInformer(volumeInformer), + controller.ClassesInformer(classInformer), ) + + recorder := makeEventRecorder(clientset) + + id, err := os.Hostname() + if err != nil { + glog.Fatalf("failed to get hostname: %v", err) + } + + resourceLock, err = resourcelock.New( + resourcelock.ConfigMapsResourceLock, + "default", + *provisioner, + clientset.CoreV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }, + ) + if err != nil { + glog.Fatalf("error creating lock: %v", err) + } +} + +func makeEventRecorder(clientset kubernetes.Interface) record.EventRecorder { + eventBroadcaster := record.NewBroadcaster() + Recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: *provisioner}) + eventBroadcaster.StartLogging(glog.Infof) + if clientset != nil { + glog.V(4).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientset.CoreV1().RESTClient()).Events(Namespace)}) + } else { + glog.Warning("No api server defined - no events will be sent to API server.") + return nil + } + return Recorder } func main() { - provisionController.Run(wait.NeverStop) + + run := func(stopCh <-chan struct{}) { + go informerFactory.Start(stopCh) + synced := informerFactory.WaitForCacheSync(stopCh) + for tpy, sync := range synced { + if !sync { + glog.Errorf("Wait for shared cache %s sync timeout", tpy) + return + } + } + + provisionController.Run(stopCh) + } + + leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ + Lock: resourceLock, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + glog.Fatalf("leader election lost") + }, + }, + }) } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 4ad6c85a62..aa2a8d7dab 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -33,6 +33,7 @@ import ( "github.com/kubernetes-incubator/external-storage/lib/controller" "k8s.io/api/core/v1" + apiv1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" @@ -46,6 +47,7 @@ import ( "google.golang.org/grpc/status" "github.com/container-storage-interface/spec/lib/go/csi/v0" + "k8s.io/client-go/tools/cache" ) const ( @@ -81,6 +83,8 @@ type csiProvisioner struct { volumeNamePrefix string volumeNameUUIDLength int config *rest.Config + secrets cache.Store + classes cache.Store } var _ controller.Provisioner = &csiProvisioner{} @@ -220,7 +224,9 @@ func NewCSIProvisioner(client kubernetes.Interface, identity string, volumeNamePrefix string, volumeNameUUIDLength int, - grpcClient *grpc.ClientConn) controller.Provisioner { + grpcClient *grpc.ClientConn, + secrets cache.Store, + classes cache.Store) controller.Provisioner { csiClient := csi.NewControllerClient(grpcClient) provisioner := &csiProvisioner{ @@ -231,6 +237,8 @@ func NewCSIProvisioner(client kubernetes.Interface, identity: identity, volumeNamePrefix: volumeNamePrefix, volumeNameUUIDLength: volumeNameUUIDLength, + secrets: secrets, + classes: classes, } return provisioner } @@ -318,7 +326,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis if err != nil { return nil, err } - provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef) + provisionerCredentials, err := getCredentials(p.secrets, provisionerSecretRef) if err != nil { return nil, err } @@ -443,14 +451,25 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error { // get secrets if StorageClass specifies it storageClassName := volume.Spec.StorageClassName if len(storageClassName) != 0 { - if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil { + storageClassObject, exists, err := p.classes.GetByKey(storageClassName) + if err != nil { + return fmt.Errorf("error getting storageclass by storageclass name %s: %v", storageClassName, err) + } else { + if !exists { + return fmt.Errorf("storageclass by storageclass name %s does not exist", storageClassName) + + } + storageClass, ok := storageClassObject.(*apiv1.StorageClass) + if !ok { + return fmt.Errorf("error getting storageclass by storageclass name %s", storageClassName) + } // Resolve provision secret credentials. // No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time. provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil) if err != nil { return err } - credentials, err := getCredentials(p.client, provisionerSecretRef) + credentials, err := getCredentials(p.secrets, provisionerSecretRef) if err != nil { return err } @@ -571,14 +590,21 @@ func resolveTemplate(template string, params map[string]string) (string, error) return resolved, nil } -func getCredentials(k8s kubernetes.Interface, ref *v1.SecretReference) (map[string]string, error) { +func getCredentials(secrectsStore cache.Store, ref *v1.SecretReference) (map[string]string, error) { if ref == nil { return nil, nil } - secret, err := k8s.CoreV1().Secrets(ref.Namespace).Get(ref.Name, metav1.GetOptions{}) + secretObject, exists, err := secrectsStore.GetByKey(fmt.Sprintf("%s/%s", ref.Namespace, ref.Name)) if err != nil { return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err) + } else if !exists { + return nil, fmt.Errorf("secret %s in namespace %s does not exists", ref.Name, ref.Namespace) + } + + secret, ok := secretObject.(*v1.Secret) + if !ok { + return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err) } credentials := map[string]string{} diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 25ac5b6d6c..189854287f 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - csi "github.com/container-storage-interface/spec/lib/go/csi/v0" + "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/golang/mock/gomock" "github.com/kubernetes-csi/csi-test/driver" "github.com/kubernetes-incubator/external-storage/lib/controller" @@ -32,6 +32,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" fakeclientset "k8s.io/client-go/kubernetes/fake" ) @@ -447,7 +448,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) { defer mockController.Finish() defer driver.Stop() - csiProvisioner := NewCSIProvisioner(nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn) + csiProvisioner := NewCSIProvisioner(nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, nil) // Requested PVC with requestedBytes storage opts := controller.VolumeOptions{ @@ -854,7 +855,9 @@ func TestProvision(t *testing.T) { clientSet = fakeclientset.NewSimpleClientset() } - csiProvisioner := NewCSIProvisioner(clientSet, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn) + informerFactory := informers.NewSharedInformerFactory(clientSet, 15*time.Second) + secrets, classes := informerFactory.Core().V1().Secrets().Informer().GetStore(), informerFactory.Storage().V1().StorageClasses().Informer().GetStore() + csiProvisioner := NewCSIProvisioner(clientSet, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, secrets, classes) out := &csi.CreateVolumeResponse{ Volume: &csi.Volume{ diff --git a/vendor/github.com/kubernetes-incubator/external-storage/lib/controller/controller.go b/vendor/github.com/kubernetes-incubator/external-storage/lib/controller/controller.go index 24855622bb..858753e5f5 100644 --- a/vendor/github.com/kubernetes-incubator/external-storage/lib/controller/controller.go +++ b/vendor/github.com/kubernetes-incubator/external-storage/lib/controller/controller.go @@ -25,7 +25,6 @@ import ( "time" "github.com/golang/glog" - "github.com/kubernetes-incubator/external-storage/lib/leaderelection" rl "github.com/kubernetes-incubator/external-storage/lib/leaderelection/resourcelock" "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" @@ -126,10 +125,6 @@ type ProvisionController struct { // when multiple controllers are running: they race to lock (lead) every PVC // so that only one calls Provision for it (saving API calls, CPU cycles...) leaseDuration, renewDeadline, retryPeriod, termLimit time.Duration - // Map of claim UID to LeaderElector: for checking if this controller - // is the leader of a given claim - leaderElectors map[types.UID]*leaderelection.LeaderElector - leaderElectorsMutex *sync.Mutex hasRun bool hasRunLock *sync.Mutex @@ -360,8 +355,6 @@ func NewProvisionController( renewDeadline: DefaultRenewDeadline, retryPeriod: DefaultRetryPeriod, termLimit: DefaultTermLimit, - leaderElectors: make(map[types.UID]*leaderelection.LeaderElector), - leaderElectorsMutex: &sync.Mutex{}, hasRun: false, hasRunLock: &sync.Mutex{}, } @@ -530,23 +523,12 @@ func (ctrl *ProvisionController) addClaim(obj interface{}) { } if ctrl.shouldProvision(claim) { - ctrl.leaderElectorsMutex.Lock() - le, ok := ctrl.leaderElectors[claim.UID] - ctrl.leaderElectorsMutex.Unlock() - if ok && le.IsLeader() { - opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - ctrl.scheduleOperation(opName, func() error { - err := ctrl.provisionClaimOperation(claim) - ctrl.updateProvisionStats(claim, err) - return err - }) - } else { - opName := fmt.Sprintf("lock-provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - ctrl.scheduleOperation(opName, func() error { - ctrl.lockProvisionClaimOperation(claim) - return nil - }) - } + opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) + ctrl.scheduleOperation(opName, func() error { + err := ctrl.provisionClaimOperation(claim) + ctrl.updateProvisionStats(claim, err) + return err + }) } } @@ -716,75 +698,6 @@ func (ctrl *ProvisionController) shouldDelete(volume *v1.PersistentVolume) bool return true } -// lockProvisionClaimOperation wraps provisionClaimOperation. In case other -// controllers are serving the same claims, to prevent them all from creating -// volumes for a claim & racing to submit their PV, each controller creates a -// LeaderElector to instead race for the leadership (lock), where only the -// leader is tasked with provisioning & may try to do so -func (ctrl *ProvisionController) lockProvisionClaimOperation(claim *v1.PersistentVolumeClaim) { - stoppedLeading := false - rl := rl.ProvisionPVCLock{ - PVCMeta: claim.ObjectMeta, - Client: ctrl.client, - LockConfig: rl.Config{ - Identity: string(ctrl.identity), - EventRecorder: ctrl.eventRecorder, - }, - } - le, err := leaderelection.NewLeaderElector(leaderelection.Config{ - Lock: &rl, - LeaseDuration: ctrl.leaseDuration, - RenewDeadline: ctrl.renewDeadline, - RetryPeriod: ctrl.retryPeriod, - TermLimit: ctrl.termLimit, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ <-chan struct{}) { - opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - ctrl.scheduleOperation(opName, func() error { - err := ctrl.provisionClaimOperation(claim) - ctrl.updateProvisionStats(claim, err) - return err - }) - }, - OnStoppedLeading: func() { - stoppedLeading = true - }, - }, - }) - if err != nil { - glog.Errorf("Error creating LeaderElector, can't provision for claim %q: %v", claimToClaimKey(claim), err) - return - } - - ctrl.leaderElectorsMutex.Lock() - ctrl.leaderElectors[claim.UID] = le - ctrl.leaderElectorsMutex.Unlock() - - // To determine when to stop trying to acquire/renew the lock, watch for - // provisioning success/failure. (The leader could get the result of its - // operation but it has to watch anyway) - stopCh := make(chan struct{}) - successCh, err := ctrl.watchProvisioning(claim, stopCh) - if err != nil { - glog.Errorf("Error watching for provisioning success, can't provision for claim %q: %v", claimToClaimKey(claim), err) - } - - le.Run(successCh) - - close(stopCh) - - // If we were the leader and stopped, give others a chance to acquire - // (whether they exist & want to or not). Else, there must have been a - // success so just proceed. - if stoppedLeading { - time.Sleep(ctrl.leaseDuration + ctrl.retryPeriod) - } - - ctrl.leaderElectorsMutex.Lock() - delete(ctrl.leaderElectors, claim.UID) - ctrl.leaderElectorsMutex.Unlock() -} - func (ctrl *ProvisionController) updateProvisionStats(claim *v1.PersistentVolumeClaim, err error) { ctrl.failedProvisionStatsMutex.Lock() defer ctrl.failedProvisionStatsMutex.Unlock() @@ -838,12 +751,17 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol // A previous doProvisionClaim may just have finished while we were waiting for // the locks. Check that PV (with deterministic name) hasn't been provisioned // yet. + nameSpace := claim.GetNamespace() pvName := ctrl.getProvisionedVolumeNameForClaim(claim) - volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}) - if err == nil && volume != nil { + glog.Infof("%+v", ctrl.volumes.ListKeys()) + _, exists, err := ctrl.volumes.GetByKey(fmt.Sprintf("%s/%s", nameSpace, pvName)) + if exists { // Volume has been already provisioned, nothing to do. glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim)) return nil + } else if err != nil { + glog.Errorf("Error getting claim %q's volume: %v", claimToClaimKey(claim), err) + return nil } // Prepare a claimRef to the claim early (to fail before a volume is @@ -884,7 +802,7 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim))) - volume, err = ctrl.provisioner.Provision(options) + volume, err := ctrl.provisioner.Provision(options) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Provision ignored, do nothing and hope another provisioner will provision it. @@ -1118,10 +1036,17 @@ func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolu // Our check does not have to be as sophisticated as PV controller's, we can // trust that the PV controller has set the PV to Released/Failed and it's // ours to delete - newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) - if err != nil { + newVolumeObject, exists, err := ctrl.volumes.GetByKey(volume.Name) + if err != nil || !exists { + return nil + } + + newVolume, ok := newVolumeObject.(*v1.PersistentVolume) + if !ok { + glog.Errorf("error getting persistentvolume %s/%s, skipping", volume.Namespace, volume.Name) return nil } + if !ctrl.shouldDelete(newVolume) { glog.Infof("volume %q no longer needs deletion, skipping", volume.Name) return nil