Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External provisioner cleanup #65

Merged
merged 3 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
"github.com/kubernetes-incubator/external-storage/lib/controller"

"google.golang.org/grpc"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -38,7 +40,7 @@ var (
provisioner = flag.String("provisioner", "", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csi-address", "", "The gRPC endpoint for Target CSI Volume")
csiEndpoint = flag.String("csi-address", "/run/csi/socket", "The gRPC endpoint for Target CSI Volume")
connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")
volumeNamePrefix = flag.String("volume-name-prefix", "kubernetes-dynamic-pv", "Prefix to apply to the name of a created volume")
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", 16, "Length in characters for the generated uuid of a created volume")
Expand Down Expand Up @@ -87,9 +89,21 @@ func init() {
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + *provisioner

// Provisioner will stay in Init until driver opens csi socket, once it's done
// controller will exit this loop and proceed normally.
socketDown := true
grpcClient := &grpc.ClientConn{}
for socketDown {
grpcClient, err = ctrl.Connect(*csiEndpoint, *connectionTimeout)
if err == nil {
socketDown = false
continue
}
time.Sleep(10 * time.Second)
}
// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *csiEndpoint, *connectionTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient)
provisionController = controller.NewProvisionController(
clientset,
*provisioner,
Expand Down
7 changes: 0 additions & 7 deletions main.go

This file was deleted.

84 changes: 55 additions & 29 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ const (
secretNamespaceKey = "csiProvisionerSecretNamespace"
)

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
driverName string
grpcClient *grpc.ClientConn
timeout time.Duration
identity string
volumeNamePrefix string
Expand Down Expand Up @@ -80,7 +81,7 @@ func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
return err
}

func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
func Connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
glog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
Expand All @@ -102,7 +103,7 @@ func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
glog.V(4).Infof("Connection timed out")
return conn, nil // return nil, subsequent GetPluginInfo will show the real connection error
return conn, fmt.Errorf("Connection timed out")
}
if conn.GetState() == connectivity.Ready {
glog.V(3).Infof("Connected")
Expand Down Expand Up @@ -185,34 +186,19 @@ func supportsControllerCreateVolume(conn *grpc.ClientConn, timeout time.Duration
return false, nil
}

func NewCSIProvisioner(client kubernetes.Interface, csiEndpoint string, connectionTimeout time.Duration, identity string, volumeNamePrefix string, volumeNameUUIDLength int) controller.Provisioner {
grpcClient, err := connect(csiEndpoint, connectionTimeout)
if err != nil || grpcClient == nil {
glog.Fatalf("failed to connect to csi endpoint :%v", err)
}
ok, err := supportsPluginControllerService(grpcClient, connectionTimeout)
if err != nil {
glog.Fatalf("failed to get support info :%v", err)
}
if !ok {
glog.Fatalf("no plugin controller service support detected")
}
ok, err = supportsControllerCreateVolume(grpcClient, connectionTimeout)
if err != nil {
glog.Fatalf("failed to get support info :%v", err)
}
if !ok {
glog.Fatalf("no create/delete volume support detected")
}
driver, err := getDriverName(grpcClient, connectionTimeout)
if err != nil {
glog.Fatalf("failed to get driver info :%v", err)
}
// NewCSIProvisioner creates new CSI provisioner
func NewCSIProvisioner(client kubernetes.Interface,
csiEndpoint string,
connectionTimeout time.Duration,
identity string,
volumeNamePrefix string,
volumeNameUUIDLength int,
grpcClient *grpc.ClientConn) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
client: client,
driverName: driver,
grpcClient: grpcClient,
csiClient: csiClient,
timeout: connectionTimeout,
identity: identity,
Expand All @@ -222,10 +208,45 @@ func NewCSIProvisioner(client kubernetes.Interface, csiEndpoint string, connecti
return provisioner
}

// This function get called before any attepmt to communicate with the driver.
// Before initiating Create/Delete API calls provisioner checks if Capabilities:
// PluginControllerService, ControllerCreateVolume sre supported and gets the driver name.
func checkDriverState(grpcClient *grpc.ClientConn, timeout time.Duration) (string, error) {
ok, err := supportsPluginControllerService(grpcClient, timeout)
if err != nil {
glog.Errorf("failed to get support info :%v", err)
return "", err
}
if !ok {
glog.Errorf("no plugin controller service support detected")
return "", fmt.Errorf("no plugin controller service support detected")
}
ok, err = supportsControllerCreateVolume(grpcClient, timeout)
if err != nil {
glog.Errorf("failed to get support info :%v", err)
return "", err
}
if !ok {
glog.Error("no create/delete volume support detected")
return "", fmt.Errorf("no create/delete volume support detected")
}
driverName, err := getDriverName(grpcClient, timeout)
if err != nil {
glog.Errorf("failed to get driver info :%v", err)
return "", err
}
return driverName, nil
}

func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
if options.PVC.Spec.Selector != nil {
return nil, fmt.Errorf("claim Selector is not supported")
}

driverName, err := checkDriverState(p.grpcClient, p.timeout)
if err != nil {
return nil, err
}
// create random share name
share := fmt.Sprintf("%s-%s", p.volumeNamePrefix, strings.Replace(string(uuid.NewUUID()), "-", "", -1)[0:p.volumeNameUUIDLength])
capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
Expand Down Expand Up @@ -283,7 +304,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
// TODO wait for CSI VolumeSource API
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
Driver: p.driverName,
Driver: driverName,
VolumeHandle: p.volumeIdToHandle(rep.Volume.Id),
VolumeAttributes: volumeAttributes,
},
Expand All @@ -304,6 +325,11 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
}
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)

_, err := checkDriverState(p.grpcClient, p.timeout)
if err != nil {
return err
}

req := csi.DeleteVolumeRequest{
VolumeId: volumeId,
}
Expand All @@ -322,7 +348,7 @@ func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

_, err := p.csiClient.DeleteVolume(ctx, &req)
_, err = p.csiClient.DeleteVolume(ctx, &req)

return err
}
Expand Down
60 changes: 59 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type csiConnection struct {
}

func New(address string, timeout time.Duration) (csiConnection, error) {
conn, err := connect(address, timeout)
conn, err := Connect(address, timeout)
if err != nil {
return csiConnection{}, err
}
Expand Down Expand Up @@ -70,6 +70,64 @@ func createMockServer(t *testing.T) (*gomock.Controller,
return mockController, drv, identityServer, controllerServer, csiConn, nil
}

func TestGetPluginName(t *testing.T) {
test := struct {
name string
output []*csi.GetPluginInfoResponse
}{
name: "success",
output: []*csi.GetPluginInfoResponse{
{
Name: "csi/example-1",
VendorVersion: "0.2.0",
Manifest: map[string]string{
"hello": "world",
},
},
{
Name: "csi/example-2",
VendorVersion: "0.2.0",
Manifest: map[string]string{
"hello": "world",
},
},
},
}

mockController, driver, identityServer, _, csiConn, err := createMockServer(t)
if err != nil {
t.Fatal(err)
}
defer mockController.Finish()
defer driver.Stop()

in := &csi.GetPluginInfoRequest{}
out := test.output[0]

identityServer.EXPECT().GetPluginInfo(gomock.Any(), in).Return(out, nil).Times(1)
oldName, err := getDriverName(csiConn.conn, timeout)
if err != nil {
t.Errorf("test %q: Failed to get driver's name", test.name)
}
if oldName != test.output[0].Name {
t.Errorf("test %s: failed, expected %s got %s", test.name, test.output[0].Name, oldName)
}

out = test.output[1]
identityServer.EXPECT().GetPluginInfo(gomock.Any(), in).Return(out, nil).Times(1)
newName, err := getDriverName(csiConn.conn, timeout)
if err != nil {
t.Errorf("test %s: Failed to get driver's name", test.name)
}
if newName != test.output[1].Name {
t.Errorf("test %q: failed, expected %s got %s", test.name, test.output[1].Name, newName)
}

if oldName == newName {
t.Errorf("test: %s failed, driver's names should not match", test.name)
}
}

func TestSupportsControllerCreateVolume(t *testing.T) {

tests := []struct {
Expand Down
17 changes: 17 additions & 0 deletions vendor/github.com/golang/mock/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions vendor/github.com/golang/mock/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions vendor/github.com/golang/mock/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions vendor/github.com/golang/mock/CONTRIBUTORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading