Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix apiserver throttling issue #104

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 110 additions & 4 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@ import (

"google.golang.org/grpc"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/api/core/v1"

"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
utilversion "k8s.io/kubernetes/pkg/util/version"
)

var (
Expand All @@ -47,8 +56,18 @@ var (
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", 16, "Length in characters for the generated uuid of a created volume")
showVersion = flag.Bool("version", false, "Show version.")

version = "unknown"
Namespace = "default"

informerFactory informers.SharedInformerFactory
resourceLock resourcelock.Interface
provisionController *controller.ProvisionController
version = "unknown"
)

const (
resyncPeriod = 1 * time.Millisecond
sharedResyncPeriod = 1 * time.Second
defaultServerVersion = "v1.6.0"
)

func init() {
Expand Down Expand Up @@ -110,17 +129,104 @@ func init() {
}
time.Sleep(10 * time.Second)
}

informerFactory = informers.NewSharedInformerFactory(clientset, resyncPeriod)
claimInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
volumeInformer := informerFactory.Core().V1().PersistentVolumes().Informer()
classInformer := func() cache.SharedIndexInformer {
if utilversion.MustParseSemantic(serverVersion.GitVersion).AtLeast(utilversion.MustParseSemantic(defaultServerVersion)) {
return informerFactory.Storage().V1().StorageClasses().Informer()
}
return informerFactory.Storage().V1beta1().StorageClasses().Informer()
}()
secretInformer := informerFactory.Core().V1().Secrets().Informer()

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient)
csiProvisioner := ctrl.NewCSIProvisioner(
clientset,
*csiEndpoint,
*connectionTimeout,
identity,
*volumeNamePrefix,
*volumeNameUUIDLength,
grpcClient,
secretInformer.GetStore(),
classInformer.GetStore(),
)
provisionController = controller.NewProvisionController(
clientset,
*provisioner,
csiProvisioner,
serverVersion.GitVersion,
controller.ClaimsInformer(claimInformer),
controller.VolumesInformer(volumeInformer),
controller.ClassesInformer(classInformer),
)

recorder := makeEventRecorder(clientset)

id, err := os.Hostname()
if err != nil {
glog.Fatalf("failed to get hostname: %v", err)
}

resourceLock, err = resourcelock.New(
resourcelock.ConfigMapsResourceLock,
"default",
*provisioner,
clientset.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
)
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
}

func makeEventRecorder(clientset kubernetes.Interface) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
Recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: *provisioner})
eventBroadcaster.StartLogging(glog.Infof)
if clientset != nil {
glog.V(4).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(clientset.CoreV1().RESTClient()).Events(Namespace)})
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
return nil
}
return Recorder
}

func main() {
provisionController.Run(wait.NeverStop)

run := func(stopCh <-chan struct{}) {
go informerFactory.Start(stopCh)
synced := informerFactory.WaitForCacheSync(stopCh)
for tpy, sync := range synced {
if !sync {
glog.Errorf("Wait for shared cache %s sync timeout", tpy)
return
}
}

provisionController.Run(stopCh)
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leader election lost")
},
},
})

panic("unreached")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for testing ? If so, make sure to remove when done.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I delete test code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this panic should never be reached, update the string to say so i.e. panic("panic reached, leader election failed").

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have deleted the test code.

}
33 changes: 27 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/kubernetes-incubator/external-storage/lib/controller"

"k8s.io/api/core/v1"
apiv1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
Expand All @@ -46,6 +47,7 @@ import (
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi/v0"
"k8s.io/client-go/tools/cache"
)

const (
Expand Down Expand Up @@ -81,6 +83,8 @@ type csiProvisioner struct {
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
secrets cache.Store
classes cache.Store
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -220,7 +224,9 @@ func NewCSIProvisioner(client kubernetes.Interface,
identity string,
volumeNamePrefix string,
volumeNameUUIDLength int,
grpcClient *grpc.ClientConn) controller.Provisioner {
grpcClient *grpc.ClientConn,
secrets cache.Store,
classes cache.Store) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -231,6 +237,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
secrets: secrets,
classes: classes,
}
return provisioner
}
Expand Down Expand Up @@ -318,7 +326,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
if err != nil {
return nil, err
}
provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef)
provisionerCredentials, err := getCredentials(p.secrets, provisionerSecretRef)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -443,14 +451,20 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
// get secrets if StorageClass specifies it
storageClassName := volume.Spec.StorageClassName
if len(storageClassName) != 0 {
if storageClass, err := p.client.StorageV1().StorageClasses().Get(storageClassName, metav1.GetOptions{}); err == nil {
storageClassObject, exists, err := p.classes.GetByKey(storageClassName)
glog.Infof("%+v %+v", exists, err)
if err == nil && exists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when err != nil? Add code to handle situations when error occurs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

storageClass, ok := storageClassObject.(*apiv1.StorageClass)
if !ok {
return fmt.Errorf("error getting storageclass by storageclass name %s", storageClassName)
}
// Resolve provision secret credentials.
// No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
provisionerSecretRef, err := getSecretReference(provisionerSecretNameKey, provisionerSecretNamespaceKey, storageClass.Parameters, volume.Name, nil)
if err != nil {
return err
}
credentials, err := getCredentials(p.client, provisionerSecretRef)
credentials, err := getCredentials(p.secrets, provisionerSecretRef)
if err != nil {
return err
}
Expand Down Expand Up @@ -571,14 +585,21 @@ func resolveTemplate(template string, params map[string]string) (string, error)
return resolved, nil
}

func getCredentials(k8s kubernetes.Interface, ref *v1.SecretReference) (map[string]string, error) {
func getCredentials(secrectsStore cache.Store, ref *v1.SecretReference) (map[string]string, error) {
if ref == nil {
return nil, nil
}

secret, err := k8s.CoreV1().Secrets(ref.Namespace).Get(ref.Name, metav1.GetOptions{})
secretObject, exists, err := secrectsStore.GetByKey(fmt.Sprintf("%s/%s", ref.Namespace, ref.Name))
if err != nil {
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err)
} else if !exists {
return nil, fmt.Errorf("secret %s in namespace %s is not exists", ref.Name, ref.Namespace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should read secret %s in namespace %s does not exist

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

}

secret, ok := secretObject.(*v1.Secret)
if !ok {
return nil, fmt.Errorf("error getting secret %s in namespace %s: %v", ref.Name, ref.Namespace, err)
}

credentials := map[string]string{}
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"testing"
"time"

csi "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/mock/gomock"
"github.com/kubernetes-csi/csi-test/driver"
"github.com/kubernetes-incubator/external-storage/lib/controller"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
)
Expand Down Expand Up @@ -447,7 +448,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer mockController.Finish()
defer driver.Stop()

csiProvisioner := NewCSIProvisioner(nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn)
csiProvisioner := NewCSIProvisioner(nil, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, nil)

// Requested PVC with requestedBytes storage
opts := controller.VolumeOptions{
Expand Down Expand Up @@ -854,7 +855,9 @@ func TestProvision(t *testing.T) {
clientSet = fakeclientset.NewSimpleClientset()
}

csiProvisioner := NewCSIProvisioner(clientSet, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn)
informerFactory := informers.NewSharedInformerFactory(clientSet, 15*time.Second)
secrets, classes := informerFactory.Core().V1().Secrets().Informer().GetStore(), informerFactory.Storage().V1().StorageClasses().Informer().GetStore()
csiProvisioner := NewCSIProvisioner(clientSet, driver.Address(), 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, secrets, classes)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down
Loading