Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix apiserver throttling issue #104

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 108 additions & 4 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@ import (

"google.golang.org/grpc"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/api/core/v1"

"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
utilversion "k8s.io/kubernetes/pkg/util/version"
)

var (
Expand All @@ -47,8 +56,18 @@ var (
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", 16, "Length in characters for the generated uuid of a created volume")
showVersion = flag.Bool("version", false, "Show version.")

version = "unknown"
Namespace = "default"

informerFactory informers.SharedInformerFactory
resourceLock resourcelock.Interface
provisionController *controller.ProvisionController
version = "unknown"
)

const (
resyncPeriod = 1 * time.Millisecond
sharedResyncPeriod = 1 * time.Second
defaultServerVersion = "v1.6.0"
)

func init() {
Expand Down Expand Up @@ -110,17 +129,102 @@ func init() {
}
time.Sleep(10 * time.Second)
}

informerFactory = informers.NewSharedInformerFactory(clientset, resyncPeriod)
claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
volumeInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
classInformer := func() cache.SharedIndexInformer {
if utilversion.MustParseSemantic(serverVersion.GitVersion).AtLeast(utilversion.MustParseSemantic(defaultServerVersion)) {
return informerFactory.Storage().V1().StorageClasses().Informer()
}
return informerFactory.Storage().V1beta1().StorageClasses().Informer()
}()
secretInformer := informerFactory.Core().V1().Secrets().Informer()

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient)
csiProvisioner := ctrl.NewCSIProvisioner(
clientset,
*csiEndpoint,
*connectionTimeout,
identity,
*volumeNamePrefix,
*volumeNameUUIDLength,
grpcClient,
secretInformer.GetStore(),
classInformer.GetStore(),
)
provisionController = controller.NewProvisionController(
clientset,
*provisioner,
csiProvisioner,
serverVersion.GitVersion,
controller.ClaimsInformer(claimInformer),
controller.VolumesInformer(volumeInformer),
controller.ClassesInformer(classInformer),
)

recorder := makeEventRecorder(clientset)

id, err := os.Hostname()
if err != nil {
glog.Fatalf("failed to get hostname: %v", err)
}

resourceLock, err = resourcelock.New(
resourcelock.ConfigMapsResourceLock,
"default",
*provisioner,
clientset.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
)
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
}

func makeEventRecorder(clientset kubernetes.Interface) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
Recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: *provisioner})
eventBroadcaster.StartLogging(glog.Infof)
if clientset != nil {
glog.V(4).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientset.CoreV1().RESTClient()).Events(Namespace)})
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
return nil
}
return Recorder
}

func main() {
provisionController.Run(wait.NeverStop)

run := func(stopCh <-chan struct{}) {
go informerFactory.Start(stopCh)
synced := informerFactory.WaitForCacheSync(stopCh)
for tpy, sync := range synced {
if !sync {
glog.Errorf("Wait for shared cache %s sync timeout", tpy)
return
}
}

provisionController.Run(stopCh)
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leader election lost")
},
},
})
}
38 changes: 32 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/kubernetes-incubator/external-storage/lib/controller"

"k8s.io/api/core/v1"
apiv1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
Expand All @@ -46,6 +47,7 @@ import (
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi/v0"
"k8s.io/client-go/tools/cache"
)

const (
Expand Down Expand Up @@ -81,6 +83,8 @@ type csiProvisioner struct {
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
secrets cache.Store
classes cache.Store
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -220,7 +224,9 @@ func NewCSIProvisioner(client kubernetes.Interface,
identity string,
volumeNamePrefix string,
volumeNameUUIDLength int,
grpcClient *grpc.ClientConn) controller.Provisioner {
grpcClient *grpc.ClientConn,
secrets cache.Store,
classes cache.Store) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -231,6 +237,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
secrets: secrets,
classes: classes,
}
return provisioner
}
Expand Down Expand Up @@ -318,7 +326,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
if err != nil {
return nil, err
}
provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef)
provisionerCredentials, err := getCredentials(p.secrets, provisionerSecretRef)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -443,14 +451,25 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
// get secrets if StorageClass specifies it
storageClassName := volume.Spec.StorageClassName
if len(storageClassName) != 0 {
if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil {
storageClassObject, exists, err := p.classes.GetByKey(storageClassName)
if err != nil {
return fmt.Errorf("error getting storageclass by storageclass name %s: %v", storageClassName, err)
} else {
if !exists {
return fmt.Errorf("storageclass by storageclass name %s does not exist", storageClassName)

}
storageClass, ok := storageClassObject.(*apiv1.StorageClass)
if !ok {
return fmt.Errorf("error getting storageclass by storageclass name %s", storageClassName)
}
// Resolve provision secret credentials.
// No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil)
if err != nil {
return err
}
credentials, err := getCredentials(p.client, provisionerSecretRef)
credentials, err := getCredentials(p.secrets, provisionerSecretRef)
if err != nil {
return err
}
Expand Down Expand Up @@ -571,14 +590,21 @@ func resolveTemplate(template string, params map[string]string) (string, error)
return resolved, nil
}

func getCredentials(k8s kubernetes.Interface, ref *v1.SecretReference) (map[string]string, error) {
func getCredentials(secrectsStore cache.Store, ref *v1.SecretReference) (map[string]string, error) {
if ref == nil {
return nil, nil
}

secret, err := k8s.CoreV1().Secrets(ref.Namespace).Get(ref.Name, metav1.GetOptions{})
secretObject, exists, err := secrectsStore.GetByKey(fmt.Sprintf("%s/%s", ref.Namespace, ref.Name))
if err != nil {
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err)
} else if !exists {
return nil, fmt.Errorf("secret %s in namespace %s does not exists", ref.Name, ref.Namespace)
}

secret, ok := secretObject.(*v1.Secret)
if !ok {
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err)
}

credentials := map[string]string{}
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"testing"
"time"

csi "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/mock/gomock"
"github.com/kubernetes-csi/csi-test/driver"
"github.com/kubernetes-incubator/external-storage/lib/controller"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
)
Expand Down Expand Up @@ -447,7 +448,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer mockController.Finish()
defer driver.Stop()

csiProvisioner := NewCSIProvisioner(nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn)
csiProvisioner := NewCSIProvisioner(nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, nil)

// Requested PVC with requestedBytes storage
opts := controller.VolumeOptions{
Expand Down Expand Up @@ -854,7 +855,9 @@ func TestProvision(t *testing.T) {
clientSet = fakeclientset.NewSimpleClientset()
}

csiProvisioner := NewCSIProvisioner(clientSet, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn)
informerFactory := informers.NewSharedInformerFactory(clientSet, 15*time.Second)
secrets, classes := informerFactory.Core().V1().Secrets().Informer().GetStore(), informerFactory.Storage().V1().StorageClasses().Informer().GetStore()
csiProvisioner := NewCSIProvisioner(clientSet, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, secrets, classes)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down
Loading