diff --git a/pkg/controllers/job/plugins/svc/const.go b/pkg/controllers/job/plugins/svc/const.go index 526500468ae..6dd7ea5348d 100644 --- a/pkg/controllers/job/plugins/svc/const.go +++ b/pkg/controllers/job/plugins/svc/const.go @@ -19,6 +19,10 @@ package svc const ( // ConfigMapTaskHostFmt key in config map ConfigMapTaskHostFmt = "%s.host" + // EnvTaskHostFmt is the key for host list in environment + EnvTaskHostFmt = "VC_%s_HOSTS" + // EnvHostNumFmt is the key for host number in environment + EnvHostNumFmt = "VC_%s_NUM" // ConfigMapMountPath mount path ConfigMapMountPath = "/etc/volcano" diff --git a/pkg/controllers/job/plugins/svc/svc.go b/pkg/controllers/job/plugins/svc/svc.go index 54c79a47a81..2d92594d0ee 100644 --- a/pkg/controllers/job/plugins/svc/svc.go +++ b/pkg/controllers/job/plugins/svc/svc.go @@ -19,6 +19,7 @@ package svc import ( "flag" "fmt" + "strconv" "strings" "k8s.io/klog" @@ -43,6 +44,9 @@ type servicePlugin struct { // flag parse args publishNotReadyAddresses bool + + // the host info of each task + hostEnv map[string]string } // New creates service plugin @@ -88,6 +92,19 @@ func (sp *servicePlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error { pod.Spec.Subdomain = job.Name } + var hostEnv []v1.EnvVar + for k, v := range sp.hostEnv { + hostEnv = append(hostEnv, v1.EnvVar{Name: k, Value: v}) + } + + for i := range pod.Spec.Containers { + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, hostEnv...) + } + + for i := range pod.Spec.InitContainers { + pod.Spec.InitContainers[i].Env = append(pod.Spec.InitContainers[i].Env, hostEnv...) + } + sp.mountConfigmap(pod, job) return nil @@ -98,9 +115,13 @@ func (sp *servicePlugin) OnJobAdd(job *batch.Job) error { return nil } - data := generateHost(job) + hostFile, hostEnv := generateHosts(job) - if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, data, sp.cmName(job)); err != nil { + // Add hosts as environment when creating pods. + sp.hostEnv = hostEnv + + // Create ConfigMap of hosts for Pods to mount. + if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, hostFile, sp.cmName(job)); err != nil { return err } @@ -258,8 +279,9 @@ func (sp *servicePlugin) cmName(job *batch.Job) string { return fmt.Sprintf("%s-%s", job.Name, sp.Name()) } -func generateHost(job *batch.Job) map[string]string { - data := make(map[string]string, len(job.Spec.Tasks)) +func generateHosts(job *batch.Job) (hostFile map[string]string, hostEnv map[string]string) { + hostFile = make(map[string]string, len(job.Spec.Tasks)) + hostEnv = make(map[string]string, len(job.Spec.Tasks)) for _, ts := range job.Spec.Tasks { hosts := make([]string, 0, ts.Replicas) @@ -280,8 +302,16 @@ func generateHost(job *batch.Job) map[string]string { } key := fmt.Sprintf(ConfigMapTaskHostFmt, ts.Name) - data[key] = strings.Join(hosts, "\n") + hostFile[key] = strings.Join(hosts, "\n") + + // TODO(k82cn): The splitter and the prefix of env should be configurable. + // export hosts as environment + key = fmt.Sprintf(EnvTaskHostFmt, strings.ToUpper(ts.Name)) + hostEnv[key] = strings.Join(hosts, ",") + // export host number as environment. + key = fmt.Sprintf(EnvHostNumFmt, strings.ToUpper(ts.Name)) + hostEnv[key] = strconv.Itoa(len(hosts)) } - return data + return } diff --git a/test/e2e/job_plugins.go b/test/e2e/job_plugins.go index d8fa27e14e1..0d78c95637e 100644 --- a/test/e2e/job_plugins.go +++ b/test/e2e/job_plugins.go @@ -18,13 +18,18 @@ package e2e import ( "fmt" + "strings" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + cv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/api" + "volcano.sh/volcano/pkg/controllers/job/helpers" "volcano.sh/volcano/pkg/controllers/job/plugins/env" + "volcano.sh/volcano/pkg/controllers/job/plugins/svc" ) var _ = Describe("Job E2E Test: Test Job Plugins", func() { @@ -222,16 +227,30 @@ var _ = Describe("Job E2E Test: Test Job Plugins", func() { } Expect(foundVolume).To(BeTrue()) - // Check whether env exists in the pod - for _, container := range pod.Spec.Containers { - for _, envi := range container.Env { - if envi.Name == env.TaskVkIndex { - foundEnv = true - break + // Check whether env exists in the containers and initContainers + containers := pod.Spec.Containers + containers = append(containers, pod.Spec.InitContainers...) + envNames := []string{ + env.TaskVkIndex, + env.TaskIndex, + fmt.Sprintf(svc.EnvTaskHostFmt, strings.ToUpper(taskName)), + fmt.Sprintf(svc.EnvHostNumFmt, strings.ToUpper(taskName)), + } + + for _, container := range containers { + for _, name := range envNames { + foundEnv = false + for _, envi := range container.Env { + if envi.Name == name { + foundEnv = true + break + } } + + Expect(foundEnv).To(BeTrue(), + fmt.Sprint("container: %s, env name: %s", container.Name, name)) } } - Expect(foundEnv).To(BeTrue()) // Check whether service is created with job name _, err = context.kubeclient.CoreV1().Services(job.Namespace).Get(job.Name, v1.GetOptions{})