diff --git a/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go b/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go index 28fdf7104..b775b089b 100644 --- a/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go +++ b/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go @@ -424,9 +424,9 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, // deliver pod for aggregation worker aggWorker := job.Spec.AggregationWorker - // Configure container mounting and Env information by initial runtime.WorkerParam + // Configure aggregation worker's mounts and envs var aggPort int32 = 7363 - aggWorkerParam := new(runtime.WorkerParam) + var aggWorkerParam runtime.WorkerParam aggWorkerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "WORKER_NAME": "aggworker-" + utilrand.String(5), @@ -451,9 +451,9 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, ) // create aggpod based on configured parameters - _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &aggWorker.Template, aggWorkerParam) + _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &aggWorker.Template, &aggWorkerParam) if err != nil { - return active, err + return active, fmt.Errorf("failed to create aggregation worker: %w", err) } active++ @@ -463,13 +463,17 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, // FIXME(llhuii): only the case that Spec.NodeName specified is support, // will support Spec.NodeSelector. appIP, err = runtime.GetNodeIPByName(c.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName) + if err != nil { + return active, err + } aggServicePort, err = runtime.CreateKubernetesService(c.kubeClient, job, jobStageAgg, aggPort, appIP) if err != nil { return active, err } + // deliver pod for training worker - for _, trainingWorker := range job.Spec.TrainingWorkers { + for i, trainingWorker := range job.Spec.TrainingWorkers { // get dataseturl through parsing crd of dataset datasetName := trainingWorker.Dataset.Name dataset, err := c.client.Datasets(job.Namespace).Get(ctx, datasetName, metav1.GetOptions{}) @@ -484,9 +488,8 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, datasetSecret, _ = c.kubeClient.CoreV1().Secrets(job.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) } - // Configure container mounting and env information - workerParam := new(runtime.WorkerParam) - + // Configure training worker's mounts and envs + var workerParam runtime.WorkerParam workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{ URL: &runtime.MountURL{ @@ -520,10 +523,11 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, workerParam.WorkerType = runtime.TrainPodType workerParam.HostNetwork = true workerParam.RestartPolicy = v1.RestartPolicyOnFailure - // create train pod based on configured parameters - _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &trainingWorker.Template, workerParam) + + // create training worker based on configured parameters + _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &trainingWorker.Template, &workerParam) if err != nil { - return active, err + return active, fmt.Errorf("failed to create %dth training worker: %w", i, err) } active++ } @@ -554,17 +558,27 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { fc.enqueueController(obj, true) + + // when a federated learning job is added, + // send it to edge's LC. fc.syncToEdge(watch.Added, obj) }, UpdateFunc: func(old, cur interface{}) { fc.enqueueController(cur, true) + + // when a federated learning job is updated, + // send it to edge's LC as Added event. fc.syncToEdge(watch.Added, cur) }, DeleteFunc: func(obj interface{}) { fc.enqueueController(obj, true) + + // when a federated learning job is deleted, + // send it to edge's LC. fc.syncToEdge(watch.Deleted, obj) }, }) + fc.jobLister = jobInformer.Lister() fc.jobStoreSynced = jobInformer.Informer().HasSynced diff --git a/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go b/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go index 95432d14b..b7dfb667a 100644 --- a/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go +++ b/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go @@ -619,13 +619,14 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn originalDataURLOrIndex = dataset.Spec.URL } - var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) + var workerParam runtime.WorkerParam + if podtype == sednav1.ILJobTrain { workerParam.WorkerType = runtime.TrainPodType podTemplate = &job.Spec.TrainSpec.Template - // Env parameters for train + // Env parameters for train workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -688,10 +689,10 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn }, ) } else { + // Configure eval worker's mounts and envs podTemplate = &job.Spec.EvalSpec.Template workerParam.WorkerType = "Eval" - // Configure Env information for eval by initial WorkerParam workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -757,7 +758,7 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn workerParam.HostNetwork = true // create pod based on podtype - _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, podTemplate, workerParam) + _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, podTemplate, &workerParam) return } @@ -771,17 +772,16 @@ func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error { inferModelURL := inferModel.Spec.URL - // Env parameters for edge HEMParameterJSON, _ := json.Marshal(job.Spec.DeploySpec.HardExampleMining.Parameters) HEMParameterString := string(HEMParameterJSON) - // Configure container mounting and Env information by initial runtime.WorkerParam modelSecret, err := c.getSecret( job.Namespace, inferModel.Spec.CredentialName, fmt.Sprintf("model %s", inferModel.Name), ) + // Configure inference worker's mounts and envs var workerParam runtime.WorkerParam workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{ @@ -809,7 +809,7 @@ func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error { workerParam.WorkerType = runtime.InferencePodType workerParam.HostNetwork = true - // create edge pod + // create the inference worker _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &job.Spec.DeploySpec.Template, &workerParam) return err }