Skip to content

Commit

Permalink
Merge pull request kubernetes-csi#1 from jsafrane/skeleton
Browse files Browse the repository at this point in the history
Add skeleton of the controller + vendoring
  • Loading branch information
jsafrane authored Oct 27, 2017
2 parents c60f3a8 + 0987e29 commit c7ab437
Show file tree
Hide file tree
Showing 5,359 changed files with 1,978,199 additions and 2 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
26 changes: 26 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

.PHONY: all csi-attacher clean test

all: csi-attacher

csi-attacher:
go build -o csi-attacher cmd/csi-attacher/main.go

clean:
-rm -rf csi-attacher

test:
go test `go list ./... | grep -v 'vendor'`
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
# external-attacher-csi
Test
# CSI attacher

The csi-attacher is part of Kubernetes implementation of [Container Storage Interface (CSI)](https://github.com/container-storage-interface/spec).

## Design

In short, it's an external controller that monitors `VolumeAttachment` objects and attaches/detaches volumes to/from nodes. Full design can be found at Kubernetes proposal at https://github.com/kubernetes/community/pull/1258. TODO: update the link after merge.

There is no plan to implement a generic external attacher library, csi-attacher is the only external attacher that exists. If this proves false in future, splitting a generic external-attacher library should be possible with some effort.

## Usage

TBD

## Vendoring

We use [dep](https://github.com/golang/dep) for management of `vendor/`.

`vendor/k8s.io` is manually copied from `staging/` directory of work-in-progress API for CSI, namely https://github.com/kubernetes/kubernetes/pull/54463.
90 changes: 90 additions & 0 deletions cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"flag"
"os"
"os/signal"
"time"

"github.com/kubernetes-csi/external-attacher-csi/pkg/controller"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

const (
// Number of worker threads
threads = 10
)

var (
// TODO: add a better description of attacher name - is it CSI plugin name? Kubernetes plugin name?
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
resync = flag.Duration("resync", 10*time.Second, "Resync interval of the controller.")
connectionTimeout = flag.Duration("connection-timeout", 60, "Timeout for waiting for CSI driver socket (in seconds).")
)

func main() {
flag.Set("logtostderr", "true")
flag.Parse()

// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
panic(err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
factory := informers.NewSharedInformerFactory(clientset, *resync)

// TODO: wait for CSI's socket and discover 'attacher' and whether the
// driver supports ControllerPulishVolume using ControllerGetCapabilities
attacher := "csi/example"
handler := controller.NewTrivialHandler(clientset)

// Start the provision controller which will dynamically provision NFS PVs
ctrl := controller.NewCSIAttachController(
clientset,
attacher,
handler,
factory.Storage().V1().VolumeAttachments(),
)

// run...
stopCh := make(chan struct{})
factory.Start(stopCh)
go ctrl.Run(threads, stopCh)

// ...until SIGINT
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
close(stopCh)
}

func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}
155 changes: 155 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"fmt"
"time"

"github.com/golang/glog"

"k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
storageinformerv1 "k8s.io/client-go/informers/storage/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
storagelisterv1 "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)

type CSIAttachController struct {
client kubernetes.Interface
attacherName string
handler Handler
eventRecorder record.EventRecorder
queue workqueue.RateLimitingInterface
syncFunc func(va *storagev1.VolumeAttachment, queue workqueue.RateLimitingInterface) (finished bool, err error)

vaLister storagelisterv1.VolumeAttachmentLister
vaListerSynced cache.InformerSynced
}

// Handler is responsible for handling VolumeAttachment events from informer.
type Handler interface {
// SyncNewOrUpdatedVolumeAttachment processes one Add/Updated event from
// VolumeAttachment informers. It runs in a workqueue and should be
// reasonably fast (i.e. talking to API server is OK, talking to CSI is
// not).
// SyncNewOrUpdatedVolumeAttachment is responsible for marking the
// VolumeAttachment either as forgotten (resets exponential backoff) or
// re-queue it into the provided queue to process it after exponential
// backoff.
SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAttachment, queue workqueue.RateLimitingInterface)
}

// NewCSIAttachController returns a new *CSIAttachController
func NewCSIAttachController(client kubernetes.Interface, attacherName string, handler Handler, volumeAttachmentInformer storageinformerv1.VolumeAttachmentInformer) *CSIAttachController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.Core().Events(v1.NamespaceAll)})
var eventRecorder record.EventRecorder
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-attacher %s", attacherName)})

ctrl := &CSIAttachController{
client: client,
attacherName: attacherName,
handler: handler,
eventRecorder: eventRecorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-attacher"),
}

volumeAttachmentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.vaAdded,
UpdateFunc: ctrl.vaUpdated,
// DeleteFunc: ctrl.vaDeleted, // TODO: do we need this?
})
ctrl.vaLister = volumeAttachmentInformer.Lister()
ctrl.vaListerSynced = volumeAttachmentInformer.Informer().HasSynced

return ctrl
}

func (ctrl *CSIAttachController) Run(workers int, stopCh <-chan struct{}) {
defer ctrl.queue.ShutDown()

glog.Infof("Starting CSI attacher")
defer glog.Infof("Shutting CSI attacher")

if !cache.WaitForCacheSync(stopCh, ctrl.vaListerSynced) {
glog.Errorf("Cannot sync caches")
return
}
for i := 0; i < workers; i++ {
go wait.Until(ctrl.runWorker, time.Second, stopCh)
}

<-stopCh
}

// vaAdded reacts to a VolumeAttachment creation
func (ctrl *CSIAttachController) vaAdded(obj interface{}) {
va := obj.(*storagev1.VolumeAttachment)
ctrl.queue.Add(va.Name)
}

// vaUpdated reacts to a VolumeAttachment update
func (ctrl *CSIAttachController) vaUpdated(old, new interface{}) {
va := new.(*storagev1.VolumeAttachment)
ctrl.queue.Add(va.Name)
}

func (ctrl *CSIAttachController) runWorker() {
for ctrl.processNextWorkItem() {
}
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (ctrl *CSIAttachController) processNextWorkItem() bool {
key, quit := ctrl.queue.Get()
if quit {
return false
}
defer ctrl.queue.Done(key)

vaName := key.(string)
glog.V(4).Infof("work for VolumeAttachment %s started", vaName)

// get VolumeAttachment to process
va, err := ctrl.vaLister.Get(vaName)
if err != nil {
if apierrs.IsNotFound(err) {
// VolumeAttachment was deleted in the meantime, ignore.
// This will remove the VolumeAttachment from queue.
glog.V(4).Infof("%s deleted, ignoring", vaName)
return true
}
if err != nil {
glog.Errorf("Error getting VolumeAttachment %s: %v", vaName, err)
ctrl.queue.AddRateLimited(vaName)
}
}
if va.Spec.Attacher != ctrl.attacherName {
glog.V(4).Infof("Skipping VolumeAttachment %s for attacher %s", va.Name, va.Spec.Attacher)
return true
}
ctrl.handler.SyncNewOrUpdatedVolumeAttachment(va, ctrl.queue)
return true
}
58 changes: 58 additions & 0 deletions pkg/controller/trivial_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"github.com/golang/glog"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
)

// trivialHandler is a handler that marks all VolumeAttachments as attached.
// It's used for CSI drivers that don't support ControllerPulishVolume call.
// It uses no finalizer, deletion of VolumeAttachment is instant (as there is
// nothing to detach).
type trivialHandler struct {
client kubernetes.Interface
}

var _ Handler = &trivialHandler{}

func NewTrivialHandler(client kubernetes.Interface) Handler {
return &trivialHandler{client}
}
func (h *trivialHandler) SyncNewOrUpdatedVolumeAttachment(va *storagev1.VolumeAttachment, queue workqueue.RateLimitingInterface) {
glog.V(4).Infof("Trivial sync[%s] started", va.Name)
if !va.Status.Attached {
// mark as attached
if err := h.markAsAttached(va); err != nil {
glog.Warningf("Error saving VolumeAttachment %s as attached: %s", va.Name, err)
queue.AddRateLimited(va.Name)
return
}
glog.V(2).Infof("Marked VolumeAttachment %s as attached", va.Name)
}
queue.Forget(va.Name)
}

func (h *trivialHandler) markAsAttached(va *storagev1.VolumeAttachment) error {
clone := va.DeepCopy()
clone.Status.Attached = true
_, err := h.client.StorageV1().VolumeAttachments().Update(clone)
return err
}
5 changes: 5 additions & 0 deletions vendor/github.com/PuerkitoBio/purell/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions vendor/github.com/PuerkitoBio/purell/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions vendor/github.com/PuerkitoBio/purell/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c7ab437

Please sign in to comment.