Skip to content

Commit

Permalink
gm: more code fix and cleanup after refactor done
Browse files Browse the repository at this point in the history
1. remove the feature redundant name in all feature controllers(e.g.
'federatedlearningJob' to 'job'), since it has already own independent
package, no need the feature extra name.
2. upstream interface optimizaztion
3. add extra doc string

Signed-off-by: llhuii <liulinghui@huawei.com>
  • Loading branch information
llhuii committed Jul 28, 2021
1 parent 5c1c167 commit 3f4e89b
Show file tree
Hide file tree
Showing 19 changed files with 310 additions and 320 deletions.
4 changes: 4 additions & 0 deletions cmd/sedna-gm/sedna-gm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ limitations under the License.
package main

import (
"math/rand"
"os"
"time"

"k8s.io/component-base/logs"

"github.com/kubeedge/sedna/cmd/sedna-gm/app"
)

func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewControllerCommand()
logs.InitLogs()
defer logs.FlushLogs()
Expand Down
2 changes: 0 additions & 2 deletions pkg/globalmanager/controllers/dataset/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,5 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
},
})

c.addUpstreamHandler(cc)

return c, nil
}
1 change: 0 additions & 1 deletion pkg/globalmanager/controllers/dataset/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,5 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro

func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error {
c.sendToEdgeFunc = f

return nil
}
4 changes: 2 additions & 2 deletions pkg/globalmanager/controllers/dataset/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ func (c *Controller) updateStatus(name, namespace string, status sednav1.Dataset
})
}

func (c *Controller) addUpstreamHandler(cc *runtime.ControllerContext) error {
return cc.UpstreamController.Add(KindName, c.updateFromEdge)
func (c *Controller) SetUpstreamHandler(addFunc runtime.UpstreamHandlerAddFunc) error {
return addFunc(KindName, c.updateFromEdge)
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ const (
)

const (
FLJobStageAgg = "Aggregation"
FLJobStageTrain = "Training"
jobStageAgg = "Aggregation"
jobStageTrain = "Training"
)

// Kind contains the schema.GroupVersionKind for this controller type.
var Kind = sednav1.SchemeGroupVersion.WithKind(KindName)

// Controller ensures that all FLJob objects have corresponding pods to
// Controller ensures that all FederatedLearningJob objects have corresponding pods to
// run their configured workload.
type Controller struct {
kubeClient kubernetes.Interface
Expand All @@ -70,7 +70,7 @@ type Controller struct {
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
// jobStoreSynced returns true if the flJob store has been synced at least once.
// jobStoreSynced returns true if the FederatedLearningJob store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jobStoreSynced cache.InformerSynced

Expand Down Expand Up @@ -256,88 +256,93 @@ func (c *Controller) sync(key string) (bool, error) {
sharedJob, err := c.jobLister.FederatedLearningJobs(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.V(4).Infof("FLJob has been deleted: %v", key)
klog.V(4).Infof("%s %v has been deleted", Name, key)
return true, nil
}
return false, err
}
flJob := *sharedJob
// set kind for flJob in case that the kind is None
flJob.SetGroupVersionKind(sednav1.SchemeGroupVersion.WithKind("FederatedLearningJob"))
// if flJob was finished previously, we don't want to redo the termination
if IsFLJobFinished(&flJob) {

job := *sharedJob
// set kind for FederatedLearningJob in case that the kind is None
job.SetGroupVersionKind(Kind)

// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
return true, nil
}
selector, _ := runtime.GenerateSelector(&flJob)
pods, err := c.podStore.Pods(flJob.Namespace).List(selector)

selector, _ := runtime.GenerateSelector(&job)
pods, err := c.podStore.Pods(job.Namespace).List(selector)
if err != nil {
return false, err
}

activePods := k8scontroller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(pods)
conditions := len(flJob.Status.Conditions)
// flJob first start
if flJob.Status.StartTime == nil {
succeeded, failed := countPods(pods)
conditions := len(job.Status.Conditions)

// set StartTime when job is handled firstly
if job.Status.StartTime == nil {
now := metav1.Now()
flJob.Status.StartTime = &now
job.Status.StartTime = &now
}

var manageJobErr error
jobFailed := false
var failureReason string
var failureMessage string
phase := flJob.Status.Phase
phase := job.Status.Phase

if failed > 0 {
jobFailed = true
failureReason = "workerFailed"
failureMessage = "the worker of FLJob failed"
failureMessage = "the worker of FederatedLearningJob failed"
}

if jobFailed {
flJob.Status.Conditions = append(flJob.Status.Conditions, NewFLJobCondition(sednav1.FLJobCondFailed, failureReason, failureMessage))
flJob.Status.Phase = sednav1.FLJobFailed
c.recorder.Event(&flJob, v1.EventTypeWarning, failureReason, failureMessage)
job.Status.Conditions = append(job.Status.Conditions, NewJobCondition(sednav1.FLJobCondFailed, failureReason, failureMessage))
job.Status.Phase = sednav1.FLJobFailed
c.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
// in the First time, we create the pods
if len(pods) == 0 {
active, manageJobErr = c.createPod(&flJob)
active, manageJobErr = c.createPod(&job)
}
complete := false
if succeeded > 0 && active == 0 {
complete = true
}
if complete {
flJob.Status.Conditions = append(flJob.Status.Conditions, NewFLJobCondition(sednav1.FLJobCondComplete, "", ""))
job.Status.Conditions = append(job.Status.Conditions, NewJobCondition(sednav1.FLJobCondComplete, "", ""))
now := metav1.Now()
flJob.Status.CompletionTime = &now
c.recorder.Event(&flJob, v1.EventTypeNormal, "Completed", "FLJob completed")
flJob.Status.Phase = sednav1.FLJobSucceeded
job.Status.CompletionTime = &now
c.recorder.Event(&job, v1.EventTypeNormal, "Completed", "FederatedLearningJob completed")
job.Status.Phase = sednav1.FLJobSucceeded
} else {
flJob.Status.Phase = sednav1.FLJobRunning
job.Status.Phase = sednav1.FLJobRunning
}
}

forget := false
// Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true
// This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to
// improve the FLJob backoff policy when parallelism > 1 and few FLJobs failed but others succeed.
// improve the job backoff policy when parallelism > 1 and few FLJobs failed but others succeed.
// In this case, we should clear the backoff delay.
if flJob.Status.Succeeded < succeeded {
if job.Status.Succeeded < succeeded {
forget = true
}

// no need to update the flJob if the status hasn't changed since last time
if flJob.Status.Active != active || flJob.Status.Succeeded != succeeded || flJob.Status.Failed != failed || len(flJob.Status.Conditions) != conditions || flJob.Status.Phase != phase {
flJob.Status.Active = active
flJob.Status.Succeeded = succeeded
flJob.Status.Failed = failed
// no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions || job.Status.Phase != phase {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
c.updateJobStatus(&job)

if jobFailed && !IsFLJobFinished(&flJob) {
// returning an error will re-enqueue FLJob after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for flJob key %q", key)
if jobFailed && !IsJobFinished(&job) {
// returning an error will re-enqueue FederatedLearningJob after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for FederatedLearningJob key %q", key)
}

forget = true
Expand All @@ -346,7 +351,7 @@ func (c *Controller) sync(key string) (bool, error) {
return forget, manageJobErr
}

func NewFLJobCondition(conditionType sednav1.FLJobConditionType, reason, message string) sednav1.FLJobCondition {
func NewJobCondition(conditionType sednav1.FLJobConditionType, reason, message string) sednav1.FLJobCondition {
return sednav1.FLJobCondition{
Type: conditionType,
Status: v1.ConditionTrue,
Expand All @@ -357,28 +362,24 @@ func NewFLJobCondition(conditionType sednav1.FLJobConditionType, reason, message
}
}

// getStatus returns no of succeeded and failed pods running a flJob
func getStatus(pods []*v1.Pod) (succeeded, failed int32) {
// countPods returns number of succeeded and failed pods
func countPods(pods []*v1.Pod) (succeeded, failed int32) {
succeeded = int32(filterPods(pods, v1.PodSucceeded))
failed = int32(filterPods(pods, v1.PodFailed))
return
}

func (c *Controller) updateFLJobStatus(flJob *sednav1.FederatedLearningJob) error {
jobClient := c.client.FederatedLearningJobs(flJob.Namespace)
var err error
for i := 0; i <= runtime.ResourceUpdateRetries; i = i + 1 {
var newFLJob *sednav1.FederatedLearningJob
newFLJob, err = jobClient.Get(context.TODO(), flJob.Name, metav1.GetOptions{})
func (c *Controller) updateJobStatus(job *sednav1.FederatedLearningJob) error {
jobClient := c.client.FederatedLearningJobs(job.Namespace)
return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error {
newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{})
if err != nil {
break
return err
}
newFLJob.Status = flJob.Status
if _, err = jobClient.UpdateStatus(context.TODO(), newFLJob, metav1.UpdateOptions{}); err == nil {
break
}
}
return nil
newJob.Status = job.Status
_, err = jobClient.UpdateStatus(context.TODO(), newJob, metav1.UpdateOptions{})
return err
})
}

// filterPods returns pods based on their phase.
Expand All @@ -392,7 +393,7 @@ func filterPods(pods []*v1.Pod, phase v1.PodPhase) int {
return result
}

func IsFLJobFinished(j *sednav1.FederatedLearningJob) bool {
func IsJobFinished(j *sednav1.FederatedLearningJob) bool {
for _, c := range j.Status.Conditions {
if (c.Type == sednav1.FLJobCondComplete || c.Type == sednav1.FLJobCondFailed) && c.Status == v1.ConditionTrue {
return true
Expand Down Expand Up @@ -435,7 +436,7 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
"PARTICIPANTS_COUNT": participantsCount,
}

aggWorkerParam.WorkerType = FLJobStageAgg
aggWorkerParam.WorkerType = jobStageAgg
aggWorkerParam.RestartPolicy = v1.RestartPolicyOnFailure

aggWorkerParam.Mounts = append(aggWorkerParam.Mounts,
Expand Down Expand Up @@ -463,7 +464,7 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
// will support Spec.NodeSelector.
appIP, err = runtime.GetNodeIPByName(c.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName)

aggServicePort, err = runtime.CreateKubernetesService(c.kubeClient, job, FLJobStageAgg, aggPort, appIP)
aggServicePort, err = runtime.CreateKubernetesService(c.kubeClient, job, jobStageAgg, aggPort, appIP)
if err != nil {
return active, err
}
Expand Down Expand Up @@ -545,8 +546,8 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
kubeClient: cc.KubeClient,
client: cc.SednaClient.SednaV1alpha1(),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), "flJob"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "flJob-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: Name + "-controller"}),
cfg: cfg,
}

Expand Down Expand Up @@ -575,7 +576,5 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
fc.podStore = podInformer.Lister()
fc.podStoreSynced = podInformer.Informer().HasSynced

fc.addUpstreamHandler(cc)

return fc, nil
}
6 changes: 3 additions & 3 deletions pkg/globalmanager/controllers/federatedlearning/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ func (c *Controller) updateFromEdge(name, namespace, operation string, content [
// TODO: more meaningful reason/message
reason := "DoTraining"
message := fmt.Sprintf("Round %v reaches at %s", jobInfo.CurrentRound, jobInfo.UpdateTime)
cond := NewFLJobCondition(sednav1.FLJobCondTraining, reason, message)
cond := NewJobCondition(sednav1.FLJobCondTraining, reason, message)
c.appendStatusCondition(name, namespace, cond)
}
}

return nil
}

func (c *Controller) addUpstreamHandler(cc *runtime.ControllerContext) error {
return cc.UpstreamController.Add(KindName, c.updateFromEdge)
func (c *Controller) SetUpstreamHandler(addFunc runtime.UpstreamHandlerAddFunc) error {
return addFunc(KindName, c.updateFromEdge)
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro
}

return nil

}

func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error {
Expand Down
Loading

0 comments on commit 3f4e89b

Please sign in to comment.