Skip to content

Commit

Permalink
add admitPod and PGController
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing (C) authored and wangyuqing4 committed Jul 17, 2019
1 parent 560a537 commit 52d08de
Show file tree
Hide file tree
Showing 36 changed files with 1,131 additions and 683 deletions.
42 changes: 42 additions & 0 deletions cmd/admission/app/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {
PrintVersion bool
AdmissionServiceName string
AdmissionServiceNamespace string
SchedulerName string
}

// NewConfig create new config
Expand Down Expand Up @@ -73,6 +74,7 @@ func (c *Config) AddFlags() {
flag.BoolVar(&c.PrintVersion, "version", false, "Show version and quit")
flag.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook")
flag.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service")
flag.StringVar(&c.SchedulerName, "scheduler-name", "volcano", "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
}

const (
Expand All @@ -84,6 +86,10 @@ const (
ValidateHookName = "validatejob.volcano.sh"
// MutateHookName Default name for webhooks in MutatingWebhookConfiguration
MutateHookName = "mutatejob.volcano.sh"
// ValidatePodConfigName ValidatingWebhookPodConfiguration name format
ValidatePodConfigName = "%s-validate-pod"
// ValidatePodHookName Default name for webhooks in ValidatingWebhookPodConfiguration
ValidatePodHookName = "validatepod.volcano.sh"
)

// CheckPortOrDie check valid port range
Expand Down Expand Up @@ -177,6 +183,42 @@ func RegisterWebhooks(c *Config, clienset *kubernetes.Clientset, cabundle []byte
return err
}

// Prepare validate pods
path = "/pods"
PodValidateHooks := v1beta1.ValidatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired("",
fmt.Sprintf(ValidatePodConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired("", ValidatePodHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create},
Rule: v1beta1.Rule{
APIGroups: []string{""},
APIVersions: []string{"v1"},
Resources: []string{"pods"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}

if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
[]v1beta1.ValidatingWebhookConfiguration{PodValidateHooks}); err != nil {
return err
}

return nil

}
Expand Down
68 changes: 2 additions & 66 deletions cmd/admission/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,13 @@ package app

import (
"crypto/tls"
"encoding/json"
"io/ioutil"
"net/http"

"github.com/golang/glog"
"volcano.sh/volcano/pkg/client/clientset/versioned"

"k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"

appConf "volcano.sh/volcano/cmd/admission/app/configure"
admissioncontroller "volcano.sh/volcano/pkg/admission"
)

const (
//CONTENTTYPE http content-type
CONTENTTYPE = "Content-Type"

//APPLICATIONJSON json content
APPLICATIONJSON = "application/json"
"volcano.sh/volcano/pkg/client/clientset/versioned"
)

// GetClient Get a clientset with restConfig.
Expand All @@ -51,7 +36,7 @@ func GetClient(restConfig *restclient.Config) *kubernetes.Clientset {
return clientset
}

//GetKubeBatchClient get a clientset for kubebatch
// GetKubeBatchClient get a clientset for volcano
func GetKubeBatchClient(restConfig *restclient.Config) *versioned.Clientset {
clientset, err := versioned.NewForConfig(restConfig)
if err != nil {
Expand Down Expand Up @@ -89,52 +74,3 @@ func ConfigTLS(config *appConf.Config, restConfig *restclient.Config) *tls.Confi
glog.Fatal("tls: failed to find any tls config data")
return &tls.Config{}
}

//Serve the http request
func Serve(w http.ResponseWriter, r *http.Request, admit admissioncontroller.AdmitFunc) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}

// verify the content type is accurate
contentType := r.Header.Get(CONTENTTYPE)
if contentType != APPLICATIONJSON {
glog.Errorf("contentType=%s, expect application/json", contentType)
return
}

var reviewResponse *v1beta1.AdmissionResponse
ar := v1beta1.AdmissionReview{}
deserializer := admissioncontroller.Codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
reviewResponse = admissioncontroller.ToAdmissionResponse(err)
} else {
reviewResponse = admit(ar)
}
glog.V(3).Infof("sending response: %v", reviewResponse)

response := createResponse(reviewResponse, &ar)
resp, err := json.Marshal(response)
if err != nil {
glog.Error(err)
}
if _, err := w.Write(resp); err != nil {
glog.Error(err)
}
}

func createResponse(reviewResponse *v1beta1.AdmissionResponse, ar *v1beta1.AdmissionReview) v1beta1.AdmissionReview {
response := v1beta1.AdmissionReview{}
if reviewResponse != nil {
response.Response = reviewResponse
response.Response.UID = ar.Request.UID
}
// reset the Object and OldObject, they are not needed in a response.
ar.Request.Object = runtime.RawExtension{}
ar.Request.OldObject = runtime.RawExtension{}

return response
}
19 changes: 16 additions & 3 deletions cmd/admission/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ package main

import (
"flag"
"github.com/golang/glog"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/golang/glog"

"k8s.io/client-go/tools/clientcmd"

"volcano.sh/volcano/cmd/admission/app"
Expand All @@ -34,11 +35,11 @@ import (
)

func serveJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.AdmitJobs)
admissioncontroller.Serve(w, r, admissioncontroller.AdmitJobs)
}

func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
app.Serve(w, r, admissioncontroller.MutateJobs)
admissioncontroller.Serve(w, r, admissioncontroller.MutateJobs)
}

func main() {
Expand All @@ -65,6 +66,8 @@ func main() {

admissioncontroller.KubeBatchClientSet = app.GetKubeBatchClient(restConfig)

servePods(config)

caBundle, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
glog.Fatalf("Unable to read cacert file: %v\n", err)
Expand Down Expand Up @@ -101,3 +104,13 @@ func main() {
return
}
}

func servePods(config *appConf.Config) {
admController := &admissioncontroller.Controller{
KbClients: admissioncontroller.KubeBatchClientSet,
SchedulerName: config.SchedulerName,
}
http.HandleFunc(admissioncontroller.AdmitPodPath, admController.ServerPods)

return
}
13 changes: 9 additions & 4 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
)

const (
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultQPS = 50.0
defaultBurst = 100
defaultWorkers = 3
defaultSchedulerName = "volcano"
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -39,7 +40,9 @@ type ServerOption struct {
PrintVersion bool
// WorkerThreads is the number of threads syncing job operations
// concurrently. Larger number = faster job updating,but more CPU load.
WorkerThreads uint32
WorkerThreads uint32
EnablePodgroupController bool
SchedulerName string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -60,6 +63,8 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
fs.BoolVar(&s.EnablePodgroupController, "enable-podgroup-controller", false, "Normal job use volcano scheduler will enable pg controller")
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
1 change: 1 addition & 0 deletions cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestAddFlags(t *testing.T) {
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
}

if !reflect.DeepEqual(expected, s) {
Expand Down
8 changes: 6 additions & 2 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

kbver "volcano.sh/volcano/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controllers/app/options"
kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
"volcano.sh/volcano/pkg/controllers/job"
"volcano.sh/volcano/pkg/controllers/podgroup"
"volcano.sh/volcano/pkg/controllers/queue"
)

Expand Down Expand Up @@ -88,11 +88,15 @@ func Run(opt *options.ServerOption) error {
jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
if opt.EnablePodgroupController {
go pgController.Run(ctx.Done())
}
<-ctx.Done()
}

Expand Down
44 changes: 0 additions & 44 deletions hack/e2e-admission-config.yaml

This file was deleted.

4 changes: 3 additions & 1 deletion hack/run-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ function install-volcano {
kind load docker-image ${MPI_EXAMPLE_IMAGE} ${CLUSTER_CONTEXT}

echo "Install volcano chart"
helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=kube-batch-ci.conf --wait
helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} \
--set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=kube-batch-ci.conf --set basic.enable_podgroup_controller=true \
--wait
}

function uninstall-volcano {
Expand Down
3 changes: 3 additions & 0 deletions installer/helm/chart/volcano/templates/admission.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ rules:
- apiGroups: [""]
resources: ["services"]
verbs: ["get"]
- apiGroups: ["scheduling.incubator.k8s.io"]
resources: ["podgroups"]
verbs: ["get", "list", "watch"]

---
kind: ClusterRoleBinding
Expand Down
1 change: 1 addition & 0 deletions installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ spec:
image: {{.Values.basic.controller_image_name}}:{{.Values.basic.image_tag_version}}
args:
- --alsologtostderr
- --enable-podgroup-controller={{.Values.basic.enable_podgroup_controller}}
- -v=4
- 2>&1
imagePullPolicy: "IfNotPresent"
1 change: 1 addition & 0 deletions installer/helm/chart/volcano/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ basic:
admission_secret_name: "volcano-admission-secret"
scheduler_config_file: "kube-batch.conf"
image_pull_secret: ""
enable_podgroup_controller: false
Loading

0 comments on commit 52d08de

Please sign in to comment.