Skip to content

Commit

Permalink
gm: clean some code/comments
Browse files Browse the repository at this point in the history
Signed-off-by: llhuii <liulinghui@huawei.com>
  • Loading branch information
llhuii committed Aug 2, 2021
1 parent bdcf2ff commit e9825da
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,9 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
// deliver pod for aggregation worker
aggWorker := job.Spec.AggregationWorker

// Configure container mounting and Env information by initial runtime.WorkerParam
// Configure aggregation worker's mounts and envs
var aggPort int32 = 7363
aggWorkerParam := new(runtime.WorkerParam)
var aggWorkerParam runtime.WorkerParam
aggWorkerParam.Env = map[string]string{
"NAMESPACE": job.Namespace,
"WORKER_NAME": "aggworker-" + utilrand.String(5),
Expand All @@ -451,9 +451,9 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
)

// create aggpod based on configured parameters
_, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &aggWorker.Template, aggWorkerParam)
_, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &aggWorker.Template, &aggWorkerParam)
if err != nil {
return active, err
return active, fmt.Errorf("failed to create aggregation worker: %w", err)
}
active++

Expand All @@ -463,13 +463,17 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
// FIXME(llhuii): only the case that Spec.NodeName specified is support,
// will support Spec.NodeSelector.
appIP, err = runtime.GetNodeIPByName(c.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName)
if err != nil {
return active, err
}

aggServicePort, err = runtime.CreateKubernetesService(c.kubeClient, job, jobStageAgg, aggPort, appIP)
if err != nil {
return active, err
}

// deliver pod for training worker
for _, trainingWorker := range job.Spec.TrainingWorkers {
for i, trainingWorker := range job.Spec.TrainingWorkers {
// get dataseturl through parsing crd of dataset
datasetName := trainingWorker.Dataset.Name
dataset, err := c.client.Datasets(job.Namespace).Get(ctx, datasetName, metav1.GetOptions{})
Expand All @@ -484,9 +488,8 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
datasetSecret, _ = c.kubeClient.CoreV1().Secrets(job.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{})
}

// Configure container mounting and env information
workerParam := new(runtime.WorkerParam)

// Configure training worker's mounts and envs
var workerParam runtime.WorkerParam
workerParam.Mounts = append(workerParam.Mounts,
runtime.WorkerMount{
URL: &runtime.MountURL{
Expand Down Expand Up @@ -520,10 +523,11 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
workerParam.WorkerType = runtime.TrainPodType
workerParam.HostNetwork = true
workerParam.RestartPolicy = v1.RestartPolicyOnFailure
// create train pod based on configured parameters
_, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &trainingWorker.Template, workerParam)

// create training worker based on configured parameters
_, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &trainingWorker.Template, &workerParam)
if err != nil {
return active, err
return active, fmt.Errorf("failed to create %dth training worker: %w", i, err)
}
active++
}
Expand Down Expand Up @@ -554,17 +558,27 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
fc.enqueueController(obj, true)

// when a federated learning job is added,
// send it to edge's LC.
fc.syncToEdge(watch.Added, obj)
},
UpdateFunc: func(old, cur interface{}) {
fc.enqueueController(cur, true)

// when a federated learning job is updated,
// send it to edge's LC as Added event.
fc.syncToEdge(watch.Added, cur)
},
DeleteFunc: func(obj interface{}) {
fc.enqueueController(obj, true)

// when a federated learning job is deleted,
// send it to edge's LC.
fc.syncToEdge(watch.Deleted, obj)
},
})

fc.jobLister = jobInformer.Lister()
fc.jobStoreSynced = jobInformer.Informer().HasSynced

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,14 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn
originalDataURLOrIndex = dataset.Spec.URL
}

var workerParam *runtime.WorkerParam = new(runtime.WorkerParam)
var workerParam runtime.WorkerParam

if podtype == sednav1.ILJobTrain {
workerParam.WorkerType = runtime.TrainPodType

podTemplate = &job.Spec.TrainSpec.Template
// Env parameters for train

// Env parameters for train
workerParam.Env = map[string]string{
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
Expand Down Expand Up @@ -688,10 +689,10 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn
},
)
} else {
// Configure eval worker's mounts and envs
podTemplate = &job.Spec.EvalSpec.Template
workerParam.WorkerType = "Eval"

// Configure Env information for eval by initial WorkerParam
workerParam.Env = map[string]string{
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
Expand Down Expand Up @@ -757,7 +758,7 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn
workerParam.HostNetwork = true

// create pod based on podtype
_, err = runtime.CreatePodWithTemplate(c.kubeClient, job, podTemplate, workerParam)
_, err = runtime.CreatePodWithTemplate(c.kubeClient, job, podTemplate, &workerParam)
return
}

Expand All @@ -771,17 +772,16 @@ func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error {

inferModelURL := inferModel.Spec.URL

// Env parameters for edge
HEMParameterJSON, _ := json.Marshal(job.Spec.DeploySpec.HardExampleMining.Parameters)
HEMParameterString := string(HEMParameterJSON)

// Configure container mounting and Env information by initial runtime.WorkerParam
modelSecret, err := c.getSecret(
job.Namespace,
inferModel.Spec.CredentialName,
fmt.Sprintf("model %s", inferModel.Name),
)

// Configure inference worker's mounts and envs
var workerParam runtime.WorkerParam
workerParam.Mounts = append(workerParam.Mounts,
runtime.WorkerMount{
Expand Down Expand Up @@ -809,7 +809,7 @@ func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error {
workerParam.WorkerType = runtime.InferencePodType
workerParam.HostNetwork = true

// create edge pod
// create the inference worker
_, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &job.Spec.DeploySpec.Template, &workerParam)
return err
}
Expand Down

0 comments on commit e9825da

Please sign in to comment.