Skip to content

Commit

Permalink
add admitPod and PGController
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing (C) committed May 16, 2019
1 parent 00272c6 commit b93f643
Show file tree
Hide file tree
Showing 24 changed files with 581 additions and 205 deletions.
30 changes: 18 additions & 12 deletions cmd/admission/app/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@ import (

// admission-controller server config.
type Config struct {
Master string
Kubeconfig string
CertFile string
KeyFile string
CaCertFile string
Port int
MutateWebhookConfigName string
MutateWebhookName string
ValidateWebhookConfigName string
ValidateWebhookName string
PrintVersion bool
Master string
Kubeconfig string
CertFile string
KeyFile string
CaCertFile string
Port int
MutateWebhookConfigName string
MutateWebhookName string
ValidateWebhookConfigName string
ValidateWebhookName string
ValidateWebhookPodConfigName string
ValidateWebhookPodName string
PrintVersion bool
}

func NewConfig() *Config {
Expand All @@ -62,9 +64,13 @@ func (c *Config) AddFlags() {
flag.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "mutatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "volcano-validate-job",
"Name of the mutatingwebhookconfiguration resource in Kubernetes.")
"Name of the validatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "validatejob.volcano.sh",
"Name of the webhook entry in the webhook config.")
flag.StringVar(&c.ValidateWebhookPodConfigName, "validate-webhook-pod-config-name", "volcano-validate-pod",
"Name of the pod validatingwebhookconfiguration resource in Kubernetes.")
flag.StringVar(&c.ValidateWebhookPodName, "validate-webhook-pod-name", "validatepod.volcano.sh",
"Name of the pod webhook entry in the webhook config.")
flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit")
}

Expand Down
64 changes: 13 additions & 51 deletions cmd/admission/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,16 @@ package app

import (
"crypto/tls"
"encoding/json"
"io/ioutil"
"net/http"

"github.com/golang/glog"

"k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

appConf "volcano.sh/volcano/cmd/admission/app/configure"
admissioncontroller "volcano.sh/volcano/pkg/admission"
)
kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"

const (
CONTENTTYPE = "Content-Type"
APPLICATIONJSON = "application/json"
appConf "volcano.sh/volcano/cmd/admission/app/configure"
)

// Get a clientset with restConfig.
Expand Down Expand Up @@ -76,50 +68,20 @@ func ConfigTLS(config *appConf.Config, restConfig *restclient.Config) *tls.Confi
return &tls.Config{}
}

func Serve(w http.ResponseWriter, r *http.Request, admit admissioncontroller.AdmitFunc) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}

// verify the content type is accurate
contentType := r.Header.Get(CONTENTTYPE)
if contentType != APPLICATIONJSON {
glog.Errorf("contentType=%s, expect application/json", contentType)
return
}
func GetKubebatchClient(config *appConf.Config) (*kbver.Clientset, error) {
var cfg *restclient.Config
var err error

var reviewResponse *v1beta1.AdmissionResponse
ar := v1beta1.AdmissionReview{}
deserializer := admissioncontroller.Codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
reviewResponse = admissioncontroller.ToAdmissionResponse(err)
master := config.Master
kubeconfig := config.Kubeconfig
if master != "" || kubeconfig != "" {
cfg, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
} else {
reviewResponse = admit(ar)
cfg, err = restclient.InClusterConfig()
}
glog.V(3).Infof("sending response: %v", reviewResponse)

response := createResponse(reviewResponse, &ar)
resp, err := json.Marshal(response)
if err != nil {
glog.Error(err)
}
if _, err := w.Write(resp); err != nil {
glog.Error(err)
}
}

func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview {
response := v1beta1.AdmissionReview{}
if reviewResponse != nil {
response.Response = reviewResponse
response.Response.UID = ar.Request.UID
return nil, err
}
// reset the Object and OldObject, they are not needed in a response.
ar.Request.Object = runtime.RawExtension{}
ar.Request.OldObject = runtime.RawExtension{}

return response
return kbver.NewForConfigOrDie(cfg), nil
}
33 changes: 29 additions & 4 deletions cmd/admission/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ import (
"os"
"strconv"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"volcano.sh/volcano/cmd/admission/app"
appConf "volcano.sh/volcano/cmd/admission/app/configure"
admissioncontroller "volcano.sh/volcano/pkg/admission"
"volcano.sh/volcano/pkg/version"

"k8s.io/client-go/tools/clientcmd"
)

func serveJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.AdmitJobs)
admissioncontroller.Serve(w, r, admissioncontroller.AdmitJobs)
}

func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.MutateJobs)
admissioncontroller.Serve(w, r, admissioncontroller.MutateJobs)
}

func main() {
Expand Down Expand Up @@ -65,6 +66,11 @@ func main() {

clientset := app.GetClient(restConfig)

if err := servePods(clientset, config); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}

caCertPem, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
Expand All @@ -78,6 +84,10 @@ func main() {
config.ValidateWebhookConfigName, config.ValidateWebhookName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
if err = appConf.PatchValidateWebhookConfig(clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
config.ValidateWebhookPodConfigName, config.ValidateWebhookPodName, caCertPem); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}

server := &http.Server{
Expand All @@ -86,3 +96,18 @@ func main() {
}
server.ListenAndServeTLS("", "")
}

func servePods(clientset *kubernetes.Clientset, config *appConf.Config) error {
kbClientset, err := app.GetKubebatchClient(config)
if err != nil {
return err
}

admController := &admissioncontroller.Controller{
KubeClients: clientset,
KbClients: kbClientset,
}
http.HandleFunc(admissioncontroller.AdmitPodPath, admController.ServerPods)

return nil
}
3 changes: 3 additions & 0 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/controllers/podgroup"
"volcano.sh/volcano/pkg/controllers/queue"
)

Expand Down Expand Up @@ -85,11 +86,13 @@ func Run(opt *options.ServerOption) error {
jobController := job.NewJobController(kubeClient, kbClient, vkClient)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, kbClient, vkClient)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
go pgController.Run(ctx.Done())
<-ctx.Done()
}

Expand Down
25 changes: 25 additions & 0 deletions installer/chart/volcano/templates/admission-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,28 @@ webhooks:
- CREATE
resources:
- jobs
---
apiVersion: admissionregistration.k8s.io/v1beta1
kind: ValidatingWebhookConfiguration
metadata:
name: {{ .Release.Name }}-validate-pod
annotations:
"helm.sh/hook": pre-install,pre-upgrade,post-delete
webhooks:
- clientConfig:
service:
name: {{ .Release.Name }}-admission-service
namespace: {{ .Release.Namespace }}
path: /pods
failurePolicy: Ignore
name: validatepod.volcano.sh
namespaceSelector: {}
rules:
- apiGroups:
- ""
apiVersions:
- "v1"
operations:
- CREATE
resources:
- pods
3 changes: 3 additions & 0 deletions installer/chart/volcano/templates/admission.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ rules:
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations"]
verbs: ["get", "list", "watch", "patch"]
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["podgroups"]
verbs: ["get", "list", "watch"]

---
kind: ClusterRoleBinding
Expand Down
14 changes: 12 additions & 2 deletions pkg/admission/admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,29 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/kubernetes"

kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"

"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

const (
AdmitJobPath = "/jobs"
MutateJobPath = "/mutating-jobs"
PVCInputName = "volcano.sh/job-input"
PVCOutputName = "volcano.sh/job-output"
AdmitPodPath = "/pods"

CONTENTTYPE = "Content-Type"
APPLICATIONJSON = "application/json"
)

type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse

type Controller struct {
KubeClients *kubernetes.Clientset
KbClients *kbver.Clientset
}

var scheme = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(scheme)

Expand Down
Loading

0 comments on commit b93f643

Please sign in to comment.