Skip to content

Commit

Permalink
Merge pull request #1 from rootfs/dev
Browse files Browse the repository at this point in the history
[WIP] provision PV
  • Loading branch information
rootfs authored Nov 17, 2017
2 parents 850b9c5 + 578fa66 commit 273afbe
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 76 deletions.
22 changes: 20 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ quick-container:

provisioner:
mkdir -p _output
go build -o _output/csi-provisioner ./cmd/csi-provisioner/
go build -i -o _output/csi-provisioner ./cmd/csi-provisioner/
.PHONY: provisioner

all build: provisioner
Expand Down
94 changes: 21 additions & 73 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2016 The Kubernetes Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,40 +17,36 @@ limitations under the License.
package main

import (
//"fmt"
//"os"
"flag"
"math/rand"
"os"
"strconv"
"time"

"github.com/golang/glog"
"k8s.io/api/core/v1"
// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
"github.com/kubernetes-incubator/external-storage/lib/controller"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"os"
"time"
"strconv"
"math/rand"
)

var (
provisioner = flag.String("provisioner", "k8s.io/default", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("CSI-Endpoint", "/tmp/csi.sock", "The gRPC endpoint for Target CSI Volume")
)

type csiProvisioner struct {
client kubernetes.Interface
execCommand string
identity string
config *rest.Config
}
provisioner = flag.String("provisioner", "k8s.io/default", "Name of the provisioner. The provisioner will only provision volumes for claims that request a StorageClass with a provisioner field set equal to this name.")
master = flag.String("master", "", "Master URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeconfig = flag.String("kubeconfig", "/var/run/kubernetes/admin.kubeconfig", "Absolute path to the kubeconfig file. Either this or master needs to be set if the provisioner is being run out of cluster.")
csiEndpoint = flag.String("csiendpoint", "/tmp/csi.sock", "The gRPC endpoint for Target CSI Volume")
connectionTimeout = flag.Duration("connection-timeout", 10*time.Second, "Timeout for waiting for CSI driver socket.")

var provisionController *controller.ProvisionController
provisionController *controller.ProvisionController
)

func init() {
var config *rest.Config
var err error

flag.Parse()
flag.Set("logtostderr", "true")
Expand All @@ -63,22 +59,6 @@ func init() {
kubeconfig = &kubeconfigEnv
}

glog.Infof("CSI Provisioner %s specified", *provisioner)



if csiEndpoint == nil {
csiEndpointEnv := os.Getenv("CSI_ENDPOINT")
if csiEndpointEnv != "" {
csiEndpoint = &csiEndpointEnv;
} else {
glog.Fatalf("No CSI Volume Endpoint defined.. Can be provided via flag (--CSI-Endpoint) or by setting the environment variable CSI_ENDPOINT..")

}
}
var config *rest.Config
var err error

if *master != "" || *kubeconfig != "" {
glog.Infof("Either master or kubeconfig specified. building kube config from that..")
config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
Expand All @@ -105,52 +85,20 @@ func init() {
}

// Generate a unique ID for this provisioner
timeStamp:=time.Now().UnixNano() / int64(time.Millisecond)
identity:= strconv.FormatInt(timeStamp,10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + *provisioner;
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + *provisioner

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := NewCSIProvisioner(clientset, *csiEndpoint, identity)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *csiEndpoint, *connectionTimeout, identity)
provisionController = controller.NewProvisionController(
clientset,
*provisioner,
csiProvisioner,
serverVersion.GitVersion,
)

}



func NewCSIProvisioner(client kubernetes.Interface, execCommand string, identity string) controller.Provisioner {
return newCSIProvisionerInternal(client, execCommand, identity)
}

func newCSIProvisionerInternal(client kubernetes.Interface, execCommand string, identity string) *csiProvisioner {

provisioner := &csiProvisioner{
client: client,
execCommand: execCommand,
identity: identity,
}

return provisioner
}

func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
glog.Infof("Provisioner %s Provision(..) called..", *provisioner)
return nil, nil
}

func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
glog.Infof("Provisioner %s Delete(..) called..", *provisioner)
return nil
}

var _ controller.Provisioner = &csiProvisioner{}

func main() {

provisionController.Run(wait.NeverStop)

}
148 changes: 148 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"net"
"strings"
"time"

"github.com/golang/glog"

"github.com/kubernetes-incubator/external-storage/lib/controller"

"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

"github.com/container-storage-interface/spec/lib/go/csi"
)

type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
timeout time.Duration
identity string
config *rest.Config
}

var _ controller.Provisioner = &csiProvisioner{}

// Version of CSI this client implements
var (
csiVersion = csi.Version{
Major: 0,
Minor: 1,
Patch: 0,
}
accessMode = &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
}
)

// from external-attacher/pkg/connection
func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
glog.V(5).Infof("GRPC call: %s", method)
glog.V(5).Infof("GRPC request: %+v", req)
err := invoker(ctx, method, req, reply, cc, opts...)
glog.V(5).Infof("GRPC response: %+v", reply)
glog.V(5).Infof("GRPC error: %v", err)
return err
}

func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) {
glog.V(2).Infof("Connecting to %s", address)
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithUnaryInterceptor(logGRPC),
}
if strings.HasPrefix(address, "/") {
dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
}
conn, err := grpc.Dial(address, dialOptions...)

if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
if !conn.WaitForStateChange(ctx, conn.GetState()) {
glog.V(4).Infof("Connection timed out")
return conn, nil // return nil, subsequent GetPluginInfo will show the real connection error
}
if conn.GetState() == connectivity.Ready {
glog.V(3).Infof("Connected")
return conn, nil
}
glog.V(4).Infof("Still trying, connection is %s", conn.GetState())
}
}

func NewCSIProvisioner(client kubernetes.Interface, csiEndpoint string, connectionTimeout time.Duration, identity string) controller.Provisioner {
grpcClient, err := connect(csiEndpoint, connectionTimeout)
if err != nil || grpcClient == nil {
glog.Fatalf("failed to connect to csi endpoint :%v", err)
}

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
client: client,
csiClient: csiClient,
timeout: connectionTimeout,
identity: identity,
}
return provisioner
}

func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
glog.Infof("Provisioner %s Provision(..) called..")
req := csi.CreateVolumeRequest{
Name: "mypv",
Version: &csiVersion,
VolumeCapabilities: []*csi.VolumeCapability{
&csi.VolumeCapability{
AccessMode: accessMode,
},
},
CapacityRange: &csi.CapacityRange{
RequiredBytes: 100,
LimitBytes: 100,
},
}
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

_, err := p.csiClient.CreateVolume(ctx, &req)
if err != nil {
return nil, err
}

return nil, nil
}

func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {
glog.Infof("Provisioner %s Delete(..) called..")
return nil
}

0 comments on commit 273afbe

Please sign in to comment.