diff --git a/cmd/sedna-gm/sedna-gm.go b/cmd/sedna-gm/sedna-gm.go index bce60eca2..3777a617c 100644 --- a/cmd/sedna-gm/sedna-gm.go +++ b/cmd/sedna-gm/sedna-gm.go @@ -17,7 +17,9 @@ limitations under the License. package main import ( + "math/rand" "os" + "time" "k8s.io/component-base/logs" @@ -25,6 +27,8 @@ import ( ) func main() { + rand.Seed(time.Now().UnixNano()) + command := app.NewControllerCommand() logs.InitLogs() defer logs.FlushLogs() diff --git a/pkg/globalmanager/controllers/dataset/dataset.go b/pkg/globalmanager/controllers/dataset/dataset.go index 1de420c99..8523057c5 100644 --- a/pkg/globalmanager/controllers/dataset/dataset.go +++ b/pkg/globalmanager/controllers/dataset/dataset.go @@ -70,7 +70,5 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { }, }) - c.addUpstreamHandler(cc) - return c, nil } diff --git a/pkg/globalmanager/controllers/dataset/downstream.go b/pkg/globalmanager/controllers/dataset/downstream.go index 8f9553fba..a898fac0c 100644 --- a/pkg/globalmanager/controllers/dataset/downstream.go +++ b/pkg/globalmanager/controllers/dataset/downstream.go @@ -32,19 +32,17 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro return nil } + // Since t.Kind may be empty, + // we need to fix the kind here if missing. + // more details at https://github.com/kubernetes/kubernetes/issues/3030 + dataset.Kind = KindName + // Here only propagate to the nodes with non empty name nodeName := dataset.Spec.NodeName if len(nodeName) == 0 { return fmt.Errorf("empty node name") } - // Since t.Kind may be empty, - // we need to fix the kind here if missing. - // more details at https://github.com/kubernetes/kubernetes/issues/3030 - if len(dataset.Kind) == 0 { - dataset.Kind = KindName - } - runtime.InjectSecretAnnotations(c.kubeClient, dataset, dataset.Spec.CredentialName) return c.sendToEdgeFunc(nodeName, eventType, dataset) @@ -52,6 +50,5 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error { c.sendToEdgeFunc = f - return nil } diff --git a/pkg/globalmanager/controllers/dataset/upstream.go b/pkg/globalmanager/controllers/dataset/upstream.go index 26a9feaa1..a1b1949e0 100644 --- a/pkg/globalmanager/controllers/dataset/upstream.go +++ b/pkg/globalmanager/controllers/dataset/upstream.go @@ -57,6 +57,6 @@ func (c *Controller) updateStatus(name, namespace string, status sednav1.Dataset }) } -func (c *Controller) addUpstreamHandler(cc *runtime.ControllerContext) error { - return cc.UpstreamController.Add(KindName, c.updateFromEdge) +func (c *Controller) SetUpstreamHandler(addFunc runtime.UpstreamHandlerAddFunc) error { + return addFunc(KindName, c.updateFromEdge) } diff --git a/pkg/globalmanager/controllers/federatedlearning/downstream.go b/pkg/globalmanager/controllers/federatedlearning/downstream.go index 9a50a8ecc..3b5f2fd22 100644 --- a/pkg/globalmanager/controllers/federatedlearning/downstream.go +++ b/pkg/globalmanager/controllers/federatedlearning/downstream.go @@ -29,6 +29,11 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro return nil } + // Since Kind may be empty, + // we need to fix the kind here if missing. + // more details at https://github.com/kubernetes/kubernetes/issues/3030 + job.Kind = KindName + // broadcast to all nodes specified in spec nodeset := make(map[string]bool) for _, trainingWorker := range job.Spec.TrainingWorkers { diff --git a/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go b/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go index d3730a0fe..b775b089b 100644 --- a/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go +++ b/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go @@ -54,14 +54,14 @@ const ( ) const ( - FLJobStageAgg = "Aggregation" - FLJobStageTrain = "Training" + jobStageAgg = "Aggregation" + jobStageTrain = "Training" ) // Kind contains the schema.GroupVersionKind for this controller type. var Kind = sednav1.SchemeGroupVersion.WithKind(KindName) -// Controller ensures that all FLJob objects have corresponding pods to +// Controller ensures that all FederatedLearningJob objects have corresponding pods to // run their configured workload. type Controller struct { kubeClient kubernetes.Interface @@ -70,7 +70,7 @@ type Controller struct { // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced - // jobStoreSynced returns true if the flJob store has been synced at least once. + // jobStoreSynced returns true if the FederatedLearningJob store has been synced at least once. // Added as a member to the struct to allow injection for testing. jobStoreSynced cache.InformerSynced @@ -256,88 +256,93 @@ func (c *Controller) sync(key string) (bool, error) { sharedJob, err := c.jobLister.FederatedLearningJobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { - klog.V(4).Infof("FLJob has been deleted: %v", key) + klog.V(4).Infof("%s %v has been deleted", Name, key) return true, nil } return false, err } - flJob := *sharedJob - // set kind for flJob in case that the kind is None - flJob.SetGroupVersionKind(sednav1.SchemeGroupVersion.WithKind("FederatedLearningJob")) - // if flJob was finished previously, we don't want to redo the termination - if IsFLJobFinished(&flJob) { + + job := *sharedJob + // set kind for FederatedLearningJob in case that the kind is None + job.SetGroupVersionKind(Kind) + + // if job was finished previously, we don't want to redo the termination + if IsJobFinished(&job) { return true, nil } - selector, _ := runtime.GenerateSelector(&flJob) - pods, err := c.podStore.Pods(flJob.Namespace).List(selector) + + selector, _ := runtime.GenerateSelector(&job) + pods, err := c.podStore.Pods(job.Namespace).List(selector) if err != nil { return false, err } activePods := k8scontroller.FilterActivePods(pods) active := int32(len(activePods)) - succeeded, failed := getStatus(pods) - conditions := len(flJob.Status.Conditions) - // flJob first start - if flJob.Status.StartTime == nil { + succeeded, failed := countPods(pods) + conditions := len(job.Status.Conditions) + + // set StartTime when job is handled firstly + if job.Status.StartTime == nil { now := metav1.Now() - flJob.Status.StartTime = &now + job.Status.StartTime = &now } var manageJobErr error jobFailed := false var failureReason string var failureMessage string - phase := flJob.Status.Phase + phase := job.Status.Phase if failed > 0 { jobFailed = true failureReason = "workerFailed" - failureMessage = "the worker of FLJob failed" + failureMessage = "the worker of FederatedLearningJob failed" } if jobFailed { - flJob.Status.Conditions = append(flJob.Status.Conditions, NewFLJobCondition(sednav1.FLJobCondFailed, failureReason, failureMessage)) - flJob.Status.Phase = sednav1.FLJobFailed - c.recorder.Event(&flJob, v1.EventTypeWarning, failureReason, failureMessage) + job.Status.Conditions = append(job.Status.Conditions, NewJobCondition(sednav1.FLJobCondFailed, failureReason, failureMessage)) + job.Status.Phase = sednav1.FLJobFailed + c.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) } else { // in the First time, we create the pods if len(pods) == 0 { - active, manageJobErr = c.createPod(&flJob) + active, manageJobErr = c.createPod(&job) } complete := false if succeeded > 0 && active == 0 { complete = true } if complete { - flJob.Status.Conditions = append(flJob.Status.Conditions, NewFLJobCondition(sednav1.FLJobCondComplete, "", "")) + job.Status.Conditions = append(job.Status.Conditions, NewJobCondition(sednav1.FLJobCondComplete, "", "")) now := metav1.Now() - flJob.Status.CompletionTime = &now - c.recorder.Event(&flJob, v1.EventTypeNormal, "Completed", "FLJob completed") - flJob.Status.Phase = sednav1.FLJobSucceeded + job.Status.CompletionTime = &now + c.recorder.Event(&job, v1.EventTypeNormal, "Completed", "FederatedLearningJob completed") + job.Status.Phase = sednav1.FLJobSucceeded } else { - flJob.Status.Phase = sednav1.FLJobRunning + job.Status.Phase = sednav1.FLJobRunning } } forget := false // Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true // This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to - // improve the FLJob backoff policy when parallelism > 1 and few FLJobs failed but others succeed. + // improve the job backoff policy when parallelism > 1 and few FLJobs failed but others succeed. // In this case, we should clear the backoff delay. - if flJob.Status.Succeeded < succeeded { + if job.Status.Succeeded < succeeded { forget = true } - // no need to update the flJob if the status hasn't changed since last time - if flJob.Status.Active != active || flJob.Status.Succeeded != succeeded || flJob.Status.Failed != failed || len(flJob.Status.Conditions) != conditions || flJob.Status.Phase != phase { - flJob.Status.Active = active - flJob.Status.Succeeded = succeeded - flJob.Status.Failed = failed + // no need to update the job if the status hasn't changed since last time + if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions || job.Status.Phase != phase { + job.Status.Active = active + job.Status.Succeeded = succeeded + job.Status.Failed = failed + c.updateJobStatus(&job) - if jobFailed && !IsFLJobFinished(&flJob) { - // returning an error will re-enqueue FLJob after the backoff period - return forget, fmt.Errorf("failed pod(s) detected for flJob key %q", key) + if jobFailed && !IsJobFinished(&job) { + // returning an error will re-enqueue FederatedLearningJob after the backoff period + return forget, fmt.Errorf("failed pod(s) detected for FederatedLearningJob key %q", key) } forget = true @@ -346,7 +351,7 @@ func (c *Controller) sync(key string) (bool, error) { return forget, manageJobErr } -func NewFLJobCondition(conditionType sednav1.FLJobConditionType, reason, message string) sednav1.FLJobCondition { +func NewJobCondition(conditionType sednav1.FLJobConditionType, reason, message string) sednav1.FLJobCondition { return sednav1.FLJobCondition{ Type: conditionType, Status: v1.ConditionTrue, @@ -357,28 +362,24 @@ func NewFLJobCondition(conditionType sednav1.FLJobConditionType, reason, message } } -// getStatus returns no of succeeded and failed pods running a flJob -func getStatus(pods []*v1.Pod) (succeeded, failed int32) { +// countPods returns number of succeeded and failed pods +func countPods(pods []*v1.Pod) (succeeded, failed int32) { succeeded = int32(filterPods(pods, v1.PodSucceeded)) failed = int32(filterPods(pods, v1.PodFailed)) return } -func (c *Controller) updateFLJobStatus(flJob *sednav1.FederatedLearningJob) error { - jobClient := c.client.FederatedLearningJobs(flJob.Namespace) - var err error - for i := 0; i <= runtime.ResourceUpdateRetries; i = i + 1 { - var newFLJob *sednav1.FederatedLearningJob - newFLJob, err = jobClient.Get(context.TODO(), flJob.Name, metav1.GetOptions{}) +func (c *Controller) updateJobStatus(job *sednav1.FederatedLearningJob) error { + jobClient := c.client.FederatedLearningJobs(job.Namespace) + return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error { + newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) if err != nil { - break - } - newFLJob.Status = flJob.Status - if _, err = jobClient.UpdateStatus(context.TODO(), newFLJob, metav1.UpdateOptions{}); err == nil { - break + return err } - } - return nil + newJob.Status = job.Status + _, err = jobClient.UpdateStatus(context.TODO(), newJob, metav1.UpdateOptions{}) + return err + }) } // filterPods returns pods based on their phase. @@ -392,7 +393,7 @@ func filterPods(pods []*v1.Pod, phase v1.PodPhase) int { return result } -func IsFLJobFinished(j *sednav1.FederatedLearningJob) bool { +func IsJobFinished(j *sednav1.FederatedLearningJob) bool { for _, c := range j.Status.Conditions { if (c.Type == sednav1.FLJobCondComplete || c.Type == sednav1.FLJobCondFailed) && c.Status == v1.ConditionTrue { return true @@ -423,9 +424,9 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, // deliver pod for aggregation worker aggWorker := job.Spec.AggregationWorker - // Configure container mounting and Env information by initial runtime.WorkerParam + // Configure aggregation worker's mounts and envs var aggPort int32 = 7363 - aggWorkerParam := new(runtime.WorkerParam) + var aggWorkerParam runtime.WorkerParam aggWorkerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "WORKER_NAME": "aggworker-" + utilrand.String(5), @@ -435,7 +436,7 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, "PARTICIPANTS_COUNT": participantsCount, } - aggWorkerParam.WorkerType = FLJobStageAgg + aggWorkerParam.WorkerType = jobStageAgg aggWorkerParam.RestartPolicy = v1.RestartPolicyOnFailure aggWorkerParam.Mounts = append(aggWorkerParam.Mounts, @@ -450,9 +451,9 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, ) // create aggpod based on configured parameters - _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &aggWorker.Template, aggWorkerParam) + _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &aggWorker.Template, &aggWorkerParam) if err != nil { - return active, err + return active, fmt.Errorf("failed to create aggregation worker: %w", err) } active++ @@ -462,13 +463,17 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, // FIXME(llhuii): only the case that Spec.NodeName specified is support, // will support Spec.NodeSelector. appIP, err = runtime.GetNodeIPByName(c.kubeClient, job.Spec.AggregationWorker.Template.Spec.NodeName) + if err != nil { + return active, err + } - aggServicePort, err = runtime.CreateKubernetesService(c.kubeClient, job, FLJobStageAgg, aggPort, appIP) + aggServicePort, err = runtime.CreateKubernetesService(c.kubeClient, job, jobStageAgg, aggPort, appIP) if err != nil { return active, err } + // deliver pod for training worker - for _, trainingWorker := range job.Spec.TrainingWorkers { + for i, trainingWorker := range job.Spec.TrainingWorkers { // get dataseturl through parsing crd of dataset datasetName := trainingWorker.Dataset.Name dataset, err := c.client.Datasets(job.Namespace).Get(ctx, datasetName, metav1.GetOptions{}) @@ -483,9 +488,8 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, datasetSecret, _ = c.kubeClient.CoreV1().Secrets(job.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) } - // Configure container mounting and env information - workerParam := new(runtime.WorkerParam) - + // Configure training worker's mounts and envs + var workerParam runtime.WorkerParam workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{ URL: &runtime.MountURL{ @@ -519,10 +523,11 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32, workerParam.WorkerType = runtime.TrainPodType workerParam.HostNetwork = true workerParam.RestartPolicy = v1.RestartPolicyOnFailure - // create train pod based on configured parameters - _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &trainingWorker.Template, workerParam) + + // create training worker based on configured parameters + _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &trainingWorker.Template, &workerParam) if err != nil { - return active, err + return active, fmt.Errorf("failed to create %dth training worker: %w", i, err) } active++ } @@ -545,25 +550,35 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { kubeClient: cc.KubeClient, client: cc.SednaClient.SednaV1alpha1(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), "flJob"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "flJob-controller"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: Name + "-controller"}), cfg: cfg, } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { fc.enqueueController(obj, true) + + // when a federated learning job is added, + // send it to edge's LC. fc.syncToEdge(watch.Added, obj) }, UpdateFunc: func(old, cur interface{}) { fc.enqueueController(cur, true) + + // when a federated learning job is updated, + // send it to edge's LC as Added event. fc.syncToEdge(watch.Added, cur) }, DeleteFunc: func(obj interface{}) { fc.enqueueController(obj, true) + + // when a federated learning job is deleted, + // send it to edge's LC. fc.syncToEdge(watch.Deleted, obj) }, }) + fc.jobLister = jobInformer.Lister() fc.jobStoreSynced = jobInformer.Informer().HasSynced @@ -575,7 +590,5 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { fc.podStore = podInformer.Lister() fc.podStoreSynced = podInformer.Informer().HasSynced - fc.addUpstreamHandler(cc) - return fc, nil } diff --git a/pkg/globalmanager/controllers/federatedlearning/upstream.go b/pkg/globalmanager/controllers/federatedlearning/upstream.go index 0bcba81e7..01888a6d2 100644 --- a/pkg/globalmanager/controllers/federatedlearning/upstream.go +++ b/pkg/globalmanager/controllers/federatedlearning/upstream.go @@ -110,7 +110,7 @@ func (c *Controller) updateFromEdge(name, namespace, operation string, content [ // TODO: more meaningful reason/message reason := "DoTraining" message := fmt.Sprintf("Round %v reaches at %s", jobInfo.CurrentRound, jobInfo.UpdateTime) - cond := NewFLJobCondition(sednav1.FLJobCondTraining, reason, message) + cond := NewJobCondition(sednav1.FLJobCondTraining, reason, message) c.appendStatusCondition(name, namespace, cond) } } @@ -118,6 +118,6 @@ func (c *Controller) updateFromEdge(name, namespace, operation string, content [ return nil } -func (c *Controller) addUpstreamHandler(cc *runtime.ControllerContext) error { - return cc.UpstreamController.Add(KindName, c.updateFromEdge) +func (c *Controller) SetUpstreamHandler(addFunc runtime.UpstreamHandlerAddFunc) error { + return addFunc(KindName, c.updateFromEdge) } diff --git a/pkg/globalmanager/controllers/incrementallearning/downstream.go b/pkg/globalmanager/controllers/incrementallearning/downstream.go index cba0b1365..a53da8cbe 100644 --- a/pkg/globalmanager/controllers/incrementallearning/downstream.go +++ b/pkg/globalmanager/controllers/incrementallearning/downstream.go @@ -55,6 +55,10 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro if !ok { return nil } + + // Since Kind may be empty, + // we need to fix the kind here if missing. + // more details at https://github.com/kubernetes/kubernetes/issues/3030 job.Kind = KindName jobConditions := job.Status.Conditions @@ -133,7 +137,6 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro } return nil - } func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error { diff --git a/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go b/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go index 5e88232f3..f1d792aef 100644 --- a/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go +++ b/pkg/globalmanager/controllers/incrementallearning/incrementallearningjob.go @@ -32,7 +32,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -68,7 +67,7 @@ type Controller struct { // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced - // jobStoreSynced returns true if the incrementaljob store has been synced at least once. + // jobStoreSynced returns true if the job store has been synced at least once. // Added as a member to the struct to allow injection for testing. jobStoreSynced cache.InformerSynced @@ -81,8 +80,6 @@ type Controller struct { // IncrementalLearningJobs that need to be updated queue workqueue.RateLimitingInterface - recorder record.EventRecorder - cfg *config.ControllerConfig sendToEdgeFunc runtime.DownstreamSendFunc @@ -104,6 +101,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { return } + klog.Infof("Starting %s job workers", Name) for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) @@ -252,7 +250,8 @@ func (c *Controller) sync(key string) (bool, error) { if len(ns) == 0 || len(name) == 0 { return false, fmt.Errorf("invalid incrementallearning job key %q: either namespace or name is missing", key) } - sharedIncrementalJob, err := c.jobLister.IncrementalLearningJobs(ns).Get(name) + + sharedJob, err := c.jobLister.IncrementalLearningJobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { klog.V(4).Infof("incrementallearning job has been deleted: %v", key) @@ -260,19 +259,21 @@ func (c *Controller) sync(key string) (bool, error) { } return false, err } - incrementaljob := *sharedIncrementalJob - // set kind for incrementaljob in case that the kind is None - incrementaljob.SetGroupVersionKind(sednav1.SchemeGroupVersion.WithKind("IncrementalLearningJob")) - // incrementaljob first start, create pod for inference - if incrementaljob.Status.StartTime == nil { + + job := *sharedJob + // set kind in case that the kind is None + job.SetGroupVersionKind(Kind) + + // when job is handled at first, create pod for inference + if job.Status.StartTime == nil { now := metav1.Now() - incrementaljob.Status.StartTime = &now - pod := c.getSpecifiedPods(&incrementaljob, runtime.InferencePodType) + job.Status.StartTime = &now + pod := c.getSpecifiedPods(&job, runtime.InferencePodType) if pod == nil { - err = c.createInferPod(&incrementaljob) + err = c.createInferPod(&job) } else { if pod.Status.Phase != v1.PodRunning && pod.Status.Phase != v1.PodPending { - err = c.createInferPod(&incrementaljob) + err = c.createInferPod(&job) } } if err != nil { @@ -280,8 +281,8 @@ func (c *Controller) sync(key string) (bool, error) { } } - // if incrementaljob was finished previously, we don't want to redo the termination - if IsIncrementalJobFinished(&incrementaljob) { + // if job was finished previously, we don't want to redo the termination + if IsJobFinished(&job) { return true, nil } @@ -289,20 +290,20 @@ func (c *Controller) sync(key string) (bool, error) { jobFailed := false needUpdated := false - // update conditions of incremental job - needUpdated, err = c.updateIncrementalJobConditions(&incrementaljob) + // transit this job's state machine + needUpdated, err = c.transitJobState(&job) if err != nil { - klog.V(2).Infof("incrementallearning job %v/%v faied to be updated, err:%s", incrementaljob.Namespace, incrementaljob.Name, err) + klog.V(2).Infof("incrementallearning job %v/%v failed to be updated, err:%s", job.Namespace, job.Name, err) } if needUpdated { - if err := c.updateIncrementalJobStatus(&incrementaljob); err != nil { + if err := c.updateJobStatus(&job); err != nil { return forget, err } - if jobFailed && !IsIncrementalJobFinished(&incrementaljob) { - // returning an error will re-enqueue IncrementalJob after the backoff period - return forget, fmt.Errorf("failed pod(s) detected for incrementaljob key %q", key) + if jobFailed && !IsJobFinished(&job) { + // returning an error will re-enqueue IncrementalLearningJob after the backoff period + return forget, fmt.Errorf("failed pod(s) detected for incrementallearning job key %q", key) } forget = true @@ -317,61 +318,56 @@ func (c *Controller) setWorkerNodeNameOfJob(job *sednav1.IncrementalLearningJob, key := runtime.AnnotationsKeyPrefix + jobStage ann := job.GetAnnotations() - if ann != nil { - if ann[key] == nodeName { - // already set - return nil - } + if ann[key] == nodeName { + // already set + return nil } + dataStr := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, nodeName) jobClient := c.client.IncrementalLearningJobs(job.Namespace) - var err error - for i := 0; i <= runtime.ResourceUpdateRetries; i++ { - var newJob *sednav1.IncrementalLearningJob - newJob, err = jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) + return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error { + newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) if err != nil { - break + return err } annotations := newJob.GetAnnotations() - if annotations != nil { - if annotations[key] == nodeName { - return nil - } - } - - dataStr := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, key, nodeName) - if _, err = jobClient.Patch(context.TODO(), job.Name, types.MergePatchType, []byte(dataStr), metav1.PatchOptions{}); err == nil { - break + if annotations[key] == nodeName { + return nil } - } - return err + _, err = jobClient.Patch(context.TODO(), job.Name, types.MergePatchType, []byte(dataStr), metav1.PatchOptions{}) + return err + }) } -// updateIncrementalJobConditions ensures that conditions of incrementallearning job can be changed by podstatus -func (c *Controller) updateIncrementalJobConditions(incrementaljob *sednav1.IncrementalLearningJob) (bool, error) { +// transitJobState transit job to next state +func (c *Controller) transitJobState(job *sednav1.IncrementalLearningJob) (bool, error) { var initialType sednav1.ILJobStageConditionType var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{ Stage: sednav1.ILJobTrain, Type: initialType, } + var newConditionType sednav1.ILJobStageConditionType var needUpdated = false - jobConditions := incrementaljob.Status.Conditions + var podStatus v1.PodPhase = v1.PodUnknown var pod *v1.Pod + + jobConditions := job.Status.Conditions if len(jobConditions) > 0 { // get latest pod and pod status latestCondition = (jobConditions)[len(jobConditions)-1] - klog.V(2).Infof("incrementallearning job %v/%v latest stage %v:", incrementaljob.Namespace, incrementaljob.Name, + klog.V(2).Infof("incrementallearning job %v/%v latest stage %v:", job.Namespace, job.Name, latestCondition.Stage) - pod = c.getSpecifiedPods(incrementaljob, string(latestCondition.Stage)) + pod = c.getSpecifiedPods(job, string(latestCondition.Stage)) if pod != nil { podStatus = pod.Status.Phase } } + jobStage := latestCondition.Stage currentType := latestCondition.Type newConditionType = currentType @@ -388,14 +384,14 @@ func (c *Controller) updateIncrementalJobConditions(incrementaljob *sednav1.Incr // include train, eval, deploy pod var err error if jobStage == sednav1.ILJobDeploy { - err = c.restartInferPod(incrementaljob) + err = c.restartInferPod(job) if err != nil { - klog.V(2).Infof("incrementallearning job %v/%v inference pod failed to restart, err:%s", incrementaljob.Namespace, incrementaljob.Name, err) + klog.V(2).Infof("incrementallearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) } else { - klog.V(2).Infof("incrementallearning job %v/%v inference pod restarts successfully", incrementaljob.Namespace, incrementaljob.Name) + klog.V(2).Infof("incrementallearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) } } else if podStatus != v1.PodPending && podStatus != v1.PodRunning { - err = c.createPod(incrementaljob, jobStage) + err = c.createPod(job, jobStage) } if err != nil { return needUpdated, err @@ -411,17 +407,17 @@ func (c *Controller) updateIncrementalJobConditions(incrementaljob *sednav1.Incr newConditionType = sednav1.ILJobStageCondRunning // add nodeName to job - if err := c.setWorkerNodeNameOfJob(incrementaljob, string(jobStage), pod.Spec.NodeName); err != nil { + if err := c.setWorkerNodeNameOfJob(job, string(jobStage), pod.Spec.NodeName); err != nil { return needUpdated, err } } } else if podStatus == v1.PodSucceeded { // watch pod status, if pod completed, set type completed newConditionType = sednav1.ILJobStageCondCompleted - klog.V(2).Infof("incrementallearning job %v/%v %v stage completed!", incrementaljob.Namespace, incrementaljob.Name, jobStage) + klog.V(2).Infof("incrementallearning job %v/%v %v stage completed!", job.Namespace, job.Name, jobStage) } else if podStatus == v1.PodFailed { newConditionType = sednav1.ILJobStageCondFailed - klog.V(2).Infof("incrementallearning job %v/%v %v stage failed!", incrementaljob.Namespace, incrementaljob.Name, jobStage) + klog.V(2).Infof("incrementallearning job %v/%v %v stage failed!", job.Namespace, job.Name, jobStage) } case sednav1.ILJobStageCondCompleted: jobStage = getNextStage(jobStage) @@ -434,31 +430,29 @@ func (c *Controller) updateIncrementalJobConditions(incrementaljob *sednav1.Incr default: // do nothing when given other type out of cases } - klog.V(2).Infof("incrementallearning job %v/%v, conditions: %v", incrementaljob.Namespace, incrementaljob.Name, jobConditions) + + klog.V(2).Infof("incrementallearning job %v/%v, conditions: %v", job.Namespace, job.Name, jobConditions) if latestCondition.Type != newConditionType { - incrementaljob.Status.Conditions = append(incrementaljob.Status.Conditions, NewIncrementalJobCondition(newConditionType, jobStage)) + job.Status.Conditions = append(job.Status.Conditions, NewIncrementalJobCondition(newConditionType, jobStage)) needUpdated = true - return needUpdated, nil } + return needUpdated, nil } -// updateIncrementalJobStatus ensures that jobstatus can be updated rightly -func (c *Controller) updateIncrementalJobStatus(incrementaljob *sednav1.IncrementalLearningJob) error { - jobClient := c.client.IncrementalLearningJobs(incrementaljob.Namespace) - var err error - for i := 0; i <= runtime.ResourceUpdateRetries; i++ { - var newIncrementalJob *sednav1.IncrementalLearningJob - newIncrementalJob, err = jobClient.Get(context.TODO(), incrementaljob.Name, metav1.GetOptions{}) +// updateJobStatus ensures that job status can be updated rightly +func (c *Controller) updateJobStatus(job *sednav1.IncrementalLearningJob) error { + jobClient := c.client.IncrementalLearningJobs(job.Namespace) + return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error { + newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) if err != nil { - break - } - newIncrementalJob.Status = incrementaljob.Status - if _, err = jobClient.UpdateStatus(context.TODO(), newIncrementalJob, metav1.UpdateOptions{}); err == nil { - break + return err } - } - return err + + newJob.Status = job.Status + _, err = jobClient.UpdateStatus(context.TODO(), newJob, metav1.UpdateOptions{}) + return err + }) } func NewIncrementalJobCondition(conditionType sednav1.ILJobStageConditionType, jobStage sednav1.ILJobStage) sednav1.ILJobCondition { @@ -478,21 +472,24 @@ func (c *Controller) generatePodName(jobName string, workerType string) string { } func (c *Controller) getSpecifiedPods(job *sednav1.IncrementalLearningJob, podType string) *v1.Pod { - if podType == "Deploy" { - podType = runtime.InferencePodType - } var latestPod *v1.Pod selector, _ := runtime.GenerateSelector(job) pods, err := c.podStore.Pods(job.Namespace).List(selector) if len(pods) == 0 || err != nil { return nil } + var matchTag = false latestPod = pods[0] + + if podType == "Deploy" { + podType = runtime.InferencePodType + } + for _, pod := range pods { s := strings.Split(pod.Name, "-") - CurrentPodType := s[len(s)-2] - if (latestPod.CreationTimestamp.Before(&pod.CreationTimestamp) || latestPod.CreationTimestamp.Equal(&pod.CreationTimestamp)) && CurrentPodType == strings.ToLower(podType) { + currentPodType := s[len(s)-2] + if (latestPod.CreationTimestamp.Before(&pod.CreationTimestamp) || latestPod.CreationTimestamp.Equal(&pod.CreationTimestamp)) && currentPodType == strings.ToLower(podType) { latestPod = pod matchTag = true } @@ -510,12 +507,14 @@ func (c *Controller) restartInferPod(job *sednav1.IncrementalLearningJob) error err := c.createInferPod(job) return err } + ctx := context.Background() err := c.kubeClient.CoreV1().Pods(job.Namespace).Delete(ctx, inferPod.Name, metav1.DeleteOptions{}) if err != nil { klog.Warningf("failed to delete inference pod %s for incrementallearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err) return err } + err = c.createInferPod(job) if err != nil { klog.Warningf("failed to create inference pod %s for incrementallearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err) @@ -537,7 +536,7 @@ func getNextStage(currentStage sednav1.ILJobStage) sednav1.ILJobStage { } } -func IsIncrementalJobFinished(j *sednav1.IncrementalLearningJob) bool { +func IsJobFinished(j *sednav1.IncrementalLearningJob) bool { // TODO return false } @@ -600,13 +599,14 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn } // get all url for train and eval from data in condition + var cond IncrementalCondData condDataStr := job.Status.Conditions[len(job.Status.Conditions)-1].Data klog.V(2).Infof("incrementallearning job %v/%v data condition:%s", job.Namespace, job.Name, condDataStr) - var cond IncrementalCondData (&cond).Unmarshal([]byte(condDataStr)) if cond.Input == nil { return fmt.Errorf("empty input from condData") } + dataURL := cond.Input.DataURL inputmodelURLs := cond.GetInputModelURLs() @@ -619,13 +619,14 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn originalDataURLOrIndex = dataset.Spec.URL } - var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) + var workerParam runtime.WorkerParam + if podtype == sednav1.ILJobTrain { workerParam.WorkerType = runtime.TrainPodType podTemplate = &job.Spec.TrainSpec.Template - // Env parameters for train + // Env parameters for train workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -688,10 +689,10 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn }, ) } else { + // Configure eval worker's mounts and envs podTemplate = &job.Spec.EvalSpec.Template workerParam.WorkerType = "Eval" - // Configure Env information for eval by initial runtime.WorkerParam workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -757,10 +758,7 @@ func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sedn workerParam.HostNetwork = true // create pod based on podtype - _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, podTemplate, workerParam) - if err != nil { - return err - } + _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, podTemplate, &workerParam) return } @@ -771,19 +769,20 @@ func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error { return fmt.Errorf("failed to get infer model %s: %w", infermodelName, err) } + inferModelURL := inferModel.Spec.URL - // Env parameters for edge HEMParameterJSON, _ := json.Marshal(job.Spec.DeploySpec.HardExampleMining.Parameters) HEMParameterString := string(HEMParameterJSON) - // Configure container mounting and Env information by initial runtime.WorkerParam modelSecret, err := c.getSecret( job.Namespace, inferModel.Spec.CredentialName, fmt.Sprintf("model %s", inferModel.Name), ) - var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) + + // Configure inference worker's mounts and envs + var workerParam runtime.WorkerParam workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{ URL: &runtime.MountURL{ @@ -810,13 +809,13 @@ func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error { workerParam.WorkerType = runtime.InferencePodType workerParam.HostNetwork = true - // create edge pod - _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &job.Spec.DeploySpec.Template, workerParam) + // create the inference worker + _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &job.Spec.DeploySpec.Template, &workerParam) return err } -// New creates a new IncrementalJob controller that keeps the relevant pods -// in sync with their corresponding IncrementalJob objects. +// New creates a new incremental learning job controller that keeps the relevant pods +// in sync with the corresponding IncrementalLearningJob objects. func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { podInformer := cc.KubeInformerFactory.Core().V1().Pods() @@ -829,9 +828,9 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { kubeClient: cc.KubeClient, client: cc.SednaClient.SednaV1alpha1(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), "incrementallearningjob"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "incrementallearningjob-controller"}), - cfg: cc.Config, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), + + cfg: cc.Config, } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -859,7 +858,5 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { jc.podStore = podInformer.Lister() jc.podStoreSynced = podInformer.Informer().HasSynced - jc.addUpstreamHandler(cc) - return jc, nil } diff --git a/pkg/globalmanager/controllers/incrementallearning/upstream.go b/pkg/globalmanager/controllers/incrementallearning/upstream.go index fa3975a4a..7932a0038 100644 --- a/pkg/globalmanager/controllers/incrementallearning/upstream.go +++ b/pkg/globalmanager/controllers/incrementallearning/upstream.go @@ -157,6 +157,6 @@ func (c *Controller) updateFromEdge(name, namespace, operation string, content [ return nil } -func (c *Controller) addUpstreamHandler(cc *runtime.ControllerContext) error { - return cc.UpstreamController.Add(KindName, c.updateFromEdge) +func (c *Controller) SetUpstreamHandler(addFunc runtime.UpstreamHandlerAddFunc) error { + return addFunc(KindName, c.updateFromEdge) } diff --git a/pkg/globalmanager/controllers/jointinference/downstream.go b/pkg/globalmanager/controllers/jointinference/downstream.go index 31778ef67..99b2563d6 100644 --- a/pkg/globalmanager/controllers/jointinference/downstream.go +++ b/pkg/globalmanager/controllers/jointinference/downstream.go @@ -31,6 +31,11 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro return nil } + // Since Kind may be empty, + // we need to fix the kind here if missing. + // more details at https://github.com/kubernetes/kubernetes/issues/3030 + joint.Kind = KindName + // Here only propagate to the nodes with non empty name // FIXME: only the case that Spec.NodeName specified is support nodeName := joint.Spec.EdgeWorker.Template.Spec.NodeName diff --git a/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go b/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go index 7a1821194..faff1143b 100644 --- a/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go +++ b/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go @@ -75,7 +75,7 @@ type Controller struct { // A store of pods podStore corelisters.PodLister - // serviceStoreSynced returns true if the jointinferenceservice store has been synced at least once. + // serviceStoreSynced returns true if the JointInferenceService store has been synced at least once. serviceStoreSynced cache.InformerSynced // A store of service serviceLister sednav1listers.JointInferenceServiceLister @@ -114,7 +114,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { <-stopCh } -// enqueueByPod enqueues the jointInferenceService object of the specified pod. +// enqueueByPod enqueues the JointInferenceService object of the specified pod. func (c *Controller) enqueueByPod(pod *v1.Pod, immediate bool) { controllerRef := metav1.GetControllerOf(pod) @@ -167,7 +167,7 @@ func (c *Controller) updatePod(old, cur interface{}) { c.addPod(curPod) } -// deletePod enqueues the jointinferenceservice obj When a pod is deleted +// deletePod enqueues the JointinferenceService obj When a pod is deleted func (c *Controller) deletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) @@ -176,7 +176,7 @@ func (c *Controller) deletePod(obj interface{}) { // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains // the deleted key/value. Note that this value might be stale. If the pod - // changed labels the new jointinferenceservice will not be woken up till the periodic resync. + // changed labels the new JointInferenceService will not be woken up till the periodic resync. if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { @@ -252,7 +252,7 @@ func (c *Controller) sync(key string) (bool, error) { if len(ns) == 0 || len(name) == 0 { return false, fmt.Errorf("invalid jointinference service key %q: either namespace or name is missing", key) } - sharedJointinferenceservice, err := c.serviceLister.JointInferenceServices(ns).Get(name) + sharedService, err := c.serviceLister.JointInferenceServices(ns).Get(name) if err != nil { if errors.IsNotFound(err) { klog.V(4).Infof("JointInferenceService has been deleted: %v", key) @@ -261,37 +261,38 @@ func (c *Controller) sync(key string) (bool, error) { return false, err } - jointinferenceservice := *sharedJointinferenceservice + service := *sharedService - // if jointinferenceservice was finished previously, we don't want to redo the termination - if isJointinferenceserviceFinished(&jointinferenceservice) { + // if service was finished previously, we don't want to redo the termination + if isServiceFinished(&service) { return true, nil } - // set kind for jointinferenceservice in case that the kind is None + // set kind for service in case that the kind is None // more details at https://github.com/kubernetes/kubernetes/issues/3030 - jointinferenceservice.SetGroupVersionKind(Kind) + service.SetGroupVersionKind(Kind) - selector, _ := runtime.GenerateSelector(&jointinferenceservice) - pods, err := c.podStore.Pods(jointinferenceservice.Namespace).List(selector) + selector, _ := runtime.GenerateSelector(&service) + pods, err := c.podStore.Pods(service.Namespace).List(selector) if err != nil { return false, err } - klog.V(4).Infof("list jointinference service %v/%v, %v pods: %v", jointinferenceservice.Namespace, jointinferenceservice.Name, len(pods), pods) + klog.V(4).Infof("list jointinference service %v/%v, %v pods: %v", service.Namespace, service.Name, len(pods), pods) - latestConditionLen := len(jointinferenceservice.Status.Conditions) + latestConditionLen := len(service.Status.Conditions) active := runtime.CalcActivePodCount(pods) var failed int32 = 0 + // neededCounts means that two pods should be created successfully in a jointinference service currently // two pods consist of edge pod and cloud pod var neededCounts int32 = 2 - // jointinferenceservice first start - if jointinferenceservice.Status.StartTime == nil { + + if service.Status.StartTime == nil { now := metav1.Now() - jointinferenceservice.Status.StartTime = &now + service.Status.StartTime = &now } else { failed = neededCounts - active } @@ -303,7 +304,7 @@ func (c *Controller) sync(key string) (bool, error) { // get the latest condition type // based on that condition updated is appended, not inserted. - jobConditions := jointinferenceservice.Status.Conditions + jobConditions := service.Status.Conditions if len(jobConditions) > 0 { latestConditionType = (jobConditions)[len(jobConditions)-1].Type } @@ -316,12 +317,12 @@ func (c *Controller) sync(key string) (bool, error) { serviceFailed = true // TODO: get the failed worker, and knows that which worker fails, edge inference worker or cloud inference worker reason = "workerFailed" - message = "the worker of Jointinferenceservice failed" + message = "the worker of service failed" newCondtionType = sednav1.JointInferenceServiceCondFailed - c.recorder.Event(&jointinferenceservice, v1.EventTypeWarning, reason, message) + c.recorder.Event(&service, v1.EventTypeWarning, reason, message) } else { if len(pods) == 0 { - active, manageServiceErr = c.createWorkers(&jointinferenceservice) + active, manageServiceErr = c.createWorkers(&service) } if manageServiceErr != nil { serviceFailed = true @@ -336,20 +337,20 @@ func (c *Controller) sync(key string) (bool, error) { // if newCondtionType != latestConditionType { - jointinferenceservice.Status.Conditions = append(jointinferenceservice.Status.Conditions, NewJointInferenceServiceCondition(newCondtionType, reason, message)) + service.Status.Conditions = append(service.Status.Conditions, newServiceCondition(newCondtionType, reason, message)) } forget := false // no need to update the jointinferenceservice if the status hasn't changed since last time - if jointinferenceservice.Status.Active != active || jointinferenceservice.Status.Failed != failed || len(jointinferenceservice.Status.Conditions) != latestConditionLen { - jointinferenceservice.Status.Active = active - jointinferenceservice.Status.Failed = failed + if service.Status.Active != active || service.Status.Failed != failed || len(service.Status.Conditions) != latestConditionLen { + service.Status.Active = active + service.Status.Failed = failed - if err := c.updateStatus(&jointinferenceservice); err != nil { + if err := c.updateStatus(&service); err != nil { return forget, err } - if serviceFailed && !isJointinferenceserviceFinished(&jointinferenceservice) { + if serviceFailed && !isServiceFinished(&service) { // returning an error will re-enqueue jointinferenceservice after the backoff period return forget, fmt.Errorf("failed pod(s) detected for jointinference service key %q", key) } @@ -360,8 +361,8 @@ func (c *Controller) sync(key string) (bool, error) { return forget, manageServiceErr } -// NewJointInferenceServiceCondition creates a new joint condition -func NewJointInferenceServiceCondition(conditionType sednav1.JointInferenceServiceConditionType, reason, message string) sednav1.JointInferenceServiceCondition { +// newServiceCondition creates a new joint condition +func newServiceCondition(conditionType sednav1.JointInferenceServiceConditionType, reason, message string) sednav1.JointInferenceServiceCondition { return sednav1.JointInferenceServiceCondition{ Type: conditionType, Status: v1.ConditionTrue, @@ -372,24 +373,20 @@ func NewJointInferenceServiceCondition(conditionType sednav1.JointInferenceServi } } -func (c *Controller) updateStatus(jointinferenceservice *sednav1.JointInferenceService) error { - serviceClient := c.client.JointInferenceServices(jointinferenceservice.Namespace) - var err error - for i := 0; i <= runtime.ResourceUpdateRetries; i = i + 1 { - var newJointinferenceservice *sednav1.JointInferenceService - newJointinferenceservice, err = serviceClient.Get(context.TODO(), jointinferenceservice.Name, metav1.GetOptions{}) +func (c *Controller) updateStatus(service *sednav1.JointInferenceService) error { + client := c.client.JointInferenceServices(service.Namespace) + return runtime.RetryUpdateStatus(service.Name, service.Namespace, func() error { + newService, err := client.Get(context.TODO(), service.Name, metav1.GetOptions{}) if err != nil { - break - } - newJointinferenceservice.Status = jointinferenceservice.Status - if _, err = serviceClient.UpdateStatus(context.TODO(), newJointinferenceservice, metav1.UpdateOptions{}); err == nil { - break + return err } - } - return nil + newService.Status = service.Status + _, err = client.UpdateStatus(context.TODO(), newService, metav1.UpdateOptions{}) + return err + }) } -func isJointinferenceserviceFinished(j *sednav1.JointInferenceService) bool { +func isServiceFinished(j *sednav1.JointInferenceService) bool { for _, c := range j.Status.Conditions { if (c.Type == sednav1.JointInferenceServiceCondFailed) && c.Status == v1.ConditionTrue { return true @@ -586,7 +583,5 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { jc.podStore = podInformer.Lister() jc.podStoreSynced = podInformer.Informer().HasSynced - jc.addUpstreamHandler(cc) - return jc, nil } diff --git a/pkg/globalmanager/controllers/jointinference/upstream.go b/pkg/globalmanager/controllers/jointinference/upstream.go index ceff6e771..93d0fa7e9 100644 --- a/pkg/globalmanager/controllers/jointinference/upstream.go +++ b/pkg/globalmanager/controllers/jointinference/upstream.go @@ -87,6 +87,6 @@ func (c *Controller) updateFromEdge(name, namespace, operation string, content [ return nil } -func (c *Controller) addUpstreamHandler(cc *runtime.ControllerContext) error { - return cc.UpstreamController.Add(KindName, c.updateFromEdge) +func (c *Controller) SetUpstreamHandler(addFunc runtime.UpstreamHandlerAddFunc) error { + return addFunc(KindName, c.updateFromEdge) } diff --git a/pkg/globalmanager/controllers/lifelonglearning/downstream.go b/pkg/globalmanager/controllers/lifelonglearning/downstream.go index 2f33516a3..8b9ef5faa 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/downstream.go +++ b/pkg/globalmanager/controllers/lifelonglearning/downstream.go @@ -30,6 +30,12 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro if !ok { return nil } + + // Since Kind may be empty, + // we need to fix the kind here if missing. + // more details at https://github.com/kubernetes/kubernetes/issues/3030 + job.Kind = KindName + // Here only propagate to the nodes with non empty name // FIXME(llhuii): only the case that all workers having the same nodeName are support, diff --git a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go index d7d2dbef3..a946ca7cb 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" @@ -78,8 +77,6 @@ type Controller struct { // LifelongLearningJobs that need to be updated queue workqueue.RateLimitingInterface - recorder record.EventRecorder - cfg *config.ControllerConfig sendToEdgeFunc runtime.DownstreamSendFunc @@ -248,7 +245,7 @@ func (c *Controller) sync(key string) (bool, error) { if len(ns) == 0 || len(name) == 0 { return false, fmt.Errorf("invalid lifelonglearning job key %q: either namespace or name is missing", key) } - sharedLifelongLearningJob, err := c.jobLister.LifelongLearningJobs(ns).Get(name) + sharedJob, err := c.jobLister.LifelongLearningJobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { klog.V(4).Infof("lifelonglearning job has been deleted: %v", key) @@ -256,18 +253,18 @@ func (c *Controller) sync(key string) (bool, error) { } return false, err } - lifelonglearningjob := *sharedLifelongLearningJob + job := *sharedJob // set kind for lifelonglearningjob in case that the kind is None - lifelonglearningjob.SetGroupVersionKind(sednav1.SchemeGroupVersion.WithKind("LifelongLearningJob")) + job.SetGroupVersionKind(Kind) - // lifelonglearningjob first start - if lifelonglearningjob.Status.StartTime == nil { + if job.Status.StartTime == nil { + // job is first in now := metav1.Now() - lifelonglearningjob.Status.StartTime = &now + job.Status.StartTime = &now } - // if lifelonglearningjob was finished previously, we don't want to redo the termination - if IsLifelongLearningJobFinished(&lifelonglearningjob) { + // if job was finished previously, we don't want to redo the termination + if IsJobFinished(&job) { return true, nil } @@ -275,18 +272,18 @@ func (c *Controller) sync(key string) (bool, error) { jobFailed := false needUpdated := false - // update conditions of lifelonglearning job - needUpdated, err = c.updateLifelongLearningJobConditions(&lifelonglearningjob) + // transit this job's state machine + needUpdated, err = c.transitJobState(&job) if err != nil { - klog.V(2).Infof("lifelonglearning job %v/%v faied to be updated, err:%s", lifelonglearningjob.Namespace, lifelonglearningjob.Name, err) + klog.V(2).Infof("lifelonglearning job %v/%v faied to be updated, err:%s", job.Namespace, job.Name, err) } if needUpdated { - if err := c.updateLifelongLearningJobStatus(&lifelonglearningjob); err != nil { + if err := c.updateJobStatus(&job); err != nil { return forget, err } - if jobFailed && !IsLifelongLearningJobFinished(&lifelonglearningjob) { + if jobFailed && !IsJobFinished(&job) { // returning an error will re-enqueue LifelongLearningJob after the backoff period return forget, fmt.Errorf("failed pod(s) detected for lifelonglearningjob key %q", key) } @@ -297,24 +294,25 @@ func (c *Controller) sync(key string) (bool, error) { return forget, err } -// updateLifelongLearningJobConditions ensures that conditions of lifelonglearning job can be changed by podstatus -func (c *Controller) updateLifelongLearningJobConditions(lifelonglearningjob *sednav1.LifelongLearningJob) (bool, error) { +// transitJobState transit job to next state +func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, error) { var initialType sednav1.LLJobStageConditionType var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{ Stage: sednav1.LLJobTrain, Type: initialType, } + var newConditionType sednav1.LLJobStageConditionType - latestCondition.Stage = sednav1.LLJobTrain var needUpdated = false - jobConditions := lifelonglearningjob.Status.Conditions + var podStatus v1.PodPhase = v1.PodUnknown + jobConditions := job.Status.Conditions if len(jobConditions) > 0 { // get latest pod and pod status latestCondition = (jobConditions)[len(jobConditions)-1] - klog.V(2).Infof("lifelonglearning job %v/%v latest stage %v:", lifelonglearningjob.Namespace, lifelonglearningjob.Name, + klog.V(2).Infof("lifelonglearning job %v/%v latest stage %v:", job.Namespace, job.Name, latestCondition.Stage) - pod := c.getSpecifiedPods(lifelonglearningjob, string(latestCondition.Stage)) + pod := c.getSpecifiedPods(job, string(latestCondition.Stage)) if pod != nil { podStatus = pod.Status.Phase @@ -336,14 +334,14 @@ func (c *Controller) updateLifelongLearningJobConditions(lifelonglearningjob *se // include train, eval, deploy pod var err error if jobStage == sednav1.LLJobDeploy { - err = c.restartInferPod(lifelonglearningjob) + err = c.restartInferPod(job) if err != nil { - klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", lifelonglearningjob.Namespace, lifelonglearningjob.Name, err) + klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) } else { - klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", lifelonglearningjob.Namespace, lifelonglearningjob.Name) + klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) } } else if podStatus != v1.PodPending && podStatus != v1.PodRunning { - err = c.createPod(lifelonglearningjob, jobStage) + err = c.createPod(job, jobStage) } if err != nil { return needUpdated, err @@ -361,10 +359,10 @@ func (c *Controller) updateLifelongLearningJobConditions(lifelonglearningjob *se } else if podStatus == v1.PodSucceeded { // watch pod status, if pod completed, set type completed newConditionType = sednav1.LLJobStageCondCompleted - klog.V(2).Infof("lifelonglearning job %v/%v %v stage completed!", lifelonglearningjob.Namespace, lifelonglearningjob.Name, jobStage) + klog.V(2).Infof("lifelonglearning job %v/%v %v stage completed!", job.Namespace, job.Name, jobStage) } else if podStatus == v1.PodFailed { newConditionType = sednav1.LLJobStageCondFailed - klog.V(2).Infof("lifelonglearning job %v/%v %v stage failed!", lifelonglearningjob.Namespace, lifelonglearningjob.Name, jobStage) + klog.V(2).Infof("lifelonglearning job %v/%v %v stage failed!", job.Namespace, job.Name, jobStage) } case sednav1.LLJobStageCondCompleted: jobStage = c.getNextStage(jobStage) @@ -377,34 +375,31 @@ func (c *Controller) updateLifelongLearningJobConditions(lifelonglearningjob *se default: // do nothing when given other type out of cases } - klog.V(2).Infof("lifelonglearning job %v/%v, conditions: %v", lifelonglearningjob.Namespace, lifelonglearningjob.Name, jobConditions) + + klog.V(2).Infof("lifelonglearning job %v/%v, conditions: %v", job.Namespace, job.Name, jobConditions) if latestCondition.Type != newConditionType { - lifelonglearningjob.Status.Conditions = append(lifelonglearningjob.Status.Conditions, NewLifelongLearningJobCondition(newConditionType, jobStage)) + job.Status.Conditions = append(job.Status.Conditions, NewJobCondition(newConditionType, jobStage)) needUpdated = true return needUpdated, nil } return needUpdated, nil } -// updateLifelongLearningJobStatus ensures that jobstatus can be updated rightly -func (c *Controller) updateLifelongLearningJobStatus(lifelonglearningjob *sednav1.LifelongLearningJob) error { - jobClient := c.client.LifelongLearningJobs(lifelonglearningjob.Namespace) - var err error - for i := 0; i <= runtime.ResourceUpdateRetries; i = i + 1 { - var newLifelongLearningJob *sednav1.LifelongLearningJob - newLifelongLearningJob, err = jobClient.Get(context.TODO(), lifelonglearningjob.Name, metav1.GetOptions{}) +// updateJobStatus ensures that jobstatus can be updated rightly +func (c *Controller) updateJobStatus(job *sednav1.LifelongLearningJob) error { + jobClient := c.client.LifelongLearningJobs(job.Namespace) + return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error { + newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) if err != nil { - break - } - newLifelongLearningJob.Status = lifelonglearningjob.Status - if _, err = jobClient.UpdateStatus(context.TODO(), newLifelongLearningJob, metav1.UpdateOptions{}); err == nil { - break + return err } - } - return err + newJob.Status = job.Status + _, err = jobClient.UpdateStatus(context.TODO(), newJob, metav1.UpdateOptions{}) + return err + }) } -func NewLifelongLearningJobCondition(conditionType sednav1.LLJobStageConditionType, jobStage sednav1.LLJobStage) sednav1.LLJobCondition { +func NewJobCondition(conditionType sednav1.LLJobStageConditionType, jobStage sednav1.LLJobStage) sednav1.LLJobCondition { return sednav1.LLJobCondition{ Type: conditionType, Status: v1.ConditionTrue, @@ -492,7 +487,7 @@ func (c *Controller) getSecret(namespace, name string, ownerStr string) (secret return } -func IsLifelongLearningJobFinished(j *sednav1.LifelongLearningJob) bool { +func IsJobFinished(j *sednav1.LifelongLearningJob) bool { // TODO return false } @@ -529,7 +524,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 // get all url for train and eval from data in condition condDataStr := job.Status.Conditions[len(job.Status.Conditions)-1].Data klog.V(2).Infof("lifelonglearning job %v/%v data condition:%s", job.Namespace, job.Name, condDataStr) - var cond LifelongLearningCondData + var cond ConditionData (&cond).Unmarshal([]byte(condDataStr)) if cond.Input == nil { return fmt.Errorf("empty input from condData") @@ -596,7 +591,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 podTemplate = &job.Spec.EvalSpec.Template workerParam.WorkerType = "Eval" - // Configure Env information for eval by initial runtime.WorkerParam + // Configure Env information for eval by initial WorkerParam workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -721,8 +716,7 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { jc := &Controller{ kubeClient: cc.KubeClient, client: cc.SednaClient.SednaV1alpha1(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), "lifelonglearningjob"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "lifelonglearningjob-controller"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), cfg: cfg, } @@ -751,7 +745,5 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { jc.podStore = podInformer.Lister() jc.podStoreSynced = podInformer.Informer().HasSynced - jc.addUpstreamHandler(cc) - return jc, nil } diff --git a/pkg/globalmanager/controllers/lifelonglearning/upstream.go b/pkg/globalmanager/controllers/lifelonglearning/upstream.go index 1c5e768f6..011c60ec7 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/upstream.go +++ b/pkg/globalmanager/controllers/lifelonglearning/upstream.go @@ -32,7 +32,7 @@ import ( type Model = runtime.Model // the data of this condition including the input/output to do the next step -type LifelongLearningCondData struct { +type ConditionData struct { Input *struct { // Only one model cases Model *Model `json:"model,omitempty"` @@ -57,7 +57,7 @@ type LifelongLearningCondData struct { } `json:"output,omitempty"` } -func (cd *LifelongLearningCondData) joinModelURLs(model *Model, models []Model) []string { +func (cd *ConditionData) joinModelURLs(model *Model, models []Model) []string { var modelURLs []string if model != nil { modelURLs = append(modelURLs, model.GetURL()) @@ -69,19 +69,19 @@ func (cd *LifelongLearningCondData) joinModelURLs(model *Model, models []Model) return modelURLs } -func (cd *LifelongLearningCondData) Unmarshal(data []byte) error { +func (cd *ConditionData) Unmarshal(data []byte) error { return json.Unmarshal(data, cd) } -func (cd LifelongLearningCondData) Marshal() ([]byte, error) { +func (cd ConditionData) Marshal() ([]byte, error) { return json.Marshal(cd) } -func (cd *LifelongLearningCondData) GetInputModelURLs() []string { +func (cd *ConditionData) GetInputModelURLs() []string { return cd.joinModelURLs(cd.Input.Model, cd.Input.Models) } -func (cd *LifelongLearningCondData) GetOutputModelURLs() []string { +func (cd *ConditionData) GetOutputModelURLs() []string { return cd.joinModelURLs(cd.Output.Model, cd.Output.Models) } @@ -112,7 +112,7 @@ func (c *Controller) updateFromEdge(name, namespace, operation string, content [ // Get the condition data. // Here unmarshal and marshal immediately to skip the unnecessary fields - var condData LifelongLearningCondData + var condData ConditionData err = json.Unmarshal(content, &condData) if err != nil { return err @@ -159,6 +159,6 @@ func (c *Controller) updateFromEdge(name, namespace, operation string, content [ return nil } -func (c *Controller) addUpstreamHandler(cc *runtime.ControllerContext) error { - return cc.UpstreamController.Add(KindName, c.updateFromEdge) +func (c *Controller) SetUpstreamHandler(addFunc runtime.UpstreamHandlerAddFunc) error { + return addFunc(KindName, c.updateFromEdge) } diff --git a/pkg/globalmanager/controllers/manager.go b/pkg/globalmanager/controllers/manager.go index 853280239..42feb40ec 100644 --- a/pkg/globalmanager/controllers/manager.go +++ b/pkg/globalmanager/controllers/manager.go @@ -76,7 +76,7 @@ func (m *Manager) Start() error { namespace = metav1.NamespaceAll } - // make this period configurable + // TODO(llhuii): make this period configurable minResyncPeriod := time.Second * 30 kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, genResyncPeriod(minResyncPeriod), kubeinformers.WithNamespace(namespace)) @@ -94,15 +94,11 @@ func (m *Manager) Start() error { } uc, _ := NewUpstreamController(context) - context.UpstreamController = uc downstreamSendFunc := messagelayer.NewContextMessageLayer().SendResourceObject stopCh := make(chan struct{}) - kubeInformerFactory.Start(stopCh) - sednaInformerFactory.Start(stopCh) - go uc.Run(stopCh) for name, factory := range NewRegistry() { @@ -111,11 +107,15 @@ func (m *Manager) Start() error { return fmt.Errorf("failed to initialize controller %s: %v", name, err) } f.SetDownstreamSendFunc(downstreamSendFunc) + f.SetUpstreamHandler(uc.Add) + klog.Infof("initialized controller %s", name) go f.Run(stopCh) - klog.Infof("started controller %s", name) } + kubeInformerFactory.Start(stopCh) + sednaInformerFactory.Start(stopCh) + addr := fmt.Sprintf("%s:%d", m.Config.WebSocket.Address, m.Config.WebSocket.Port) ws := websocket.NewServer(addr) diff --git a/pkg/globalmanager/controllers/upstream.go b/pkg/globalmanager/controllers/upstream.go index 9e6a22162..c02f2c571 100644 --- a/pkg/globalmanager/controllers/upstream.go +++ b/pkg/globalmanager/controllers/upstream.go @@ -29,13 +29,13 @@ import ( // UpstreamController subscribes the updates from edge and syncs to k8s api server type UpstreamController struct { messageLayer messagelayer.MessageLayer - updateHandlers map[string]runtime.UpstreamUpdateHandler + updateHandlers map[string]runtime.UpstreamHandler } func (uc *UpstreamController) checkOperation(operation string) error { // current only support the 'status' operation if operation != "status" { - return fmt.Errorf("unknown operation %s", operation) + return fmt.Errorf("unknown operation '%s'", operation) } return nil } @@ -84,7 +84,7 @@ func (uc *UpstreamController) Run(stopCh <-chan struct{}) { <-stopCh } -func (uc *UpstreamController) Add(kind string, handler runtime.UpstreamUpdateHandler) error { +func (uc *UpstreamController) Add(kind string, handler runtime.UpstreamHandler) error { kind = strings.ToLower(kind) if _, ok := uc.updateHandlers[kind]; ok { return fmt.Errorf("a upstream handler for kind %s already exists", kind) @@ -95,10 +95,10 @@ func (uc *UpstreamController) Add(kind string, handler runtime.UpstreamUpdateHan } // NewUpstreamController creates a new Upstream controller from config -func NewUpstreamController(cc *runtime.ControllerContext) (runtime.UpstreamControllerI, error) { +func NewUpstreamController(cc *runtime.ControllerContext) (*UpstreamController, error) { uc := &UpstreamController{ messageLayer: messagelayer.NewContextMessageLayer(), - updateHandlers: make(map[string]runtime.UpstreamUpdateHandler), + updateHandlers: make(map[string]runtime.UpstreamHandler), } return uc, nil diff --git a/pkg/globalmanager/runtime/common.go b/pkg/globalmanager/runtime/common.go index 47bc7e0e9..e85c15c00 100644 --- a/pkg/globalmanager/runtime/common.go +++ b/pkg/globalmanager/runtime/common.go @@ -34,12 +34,8 @@ import ( ) const ( - // DefaultBackOff is the default backoff period - DefaultBackOff = 10 * time.Second - // MaxBackOff is the max backoff period - MaxBackOff = 360 * time.Second - // ResourceUpdateRetries defines times of retrying to update resource - ResourceUpdateRetries = 3 + // resourceUpdateTries defines times of trying to update resource + resourceUpdateTries = 3 ) // GetNodeIPByName get node ip by node name @@ -152,17 +148,15 @@ func ConvertMapToMetrics(metric map[string]interface{}) []sednav1.Metric { return l } -const upstreamStatusUpdateRetries = 3 - // RetryUpdateStatus simply retries to call the status update func func RetryUpdateStatus(name, namespace string, updateStatusFunc func() error) error { var err error - for retry := 0; retry <= upstreamStatusUpdateRetries; retry++ { + for try := 1; try <= resourceUpdateTries; try++ { err = updateStatusFunc() if err == nil { return nil } - klog.Warningf("Error to update %s/%s status, retried %d times: %+v", namespace, name, retry, err) + klog.Warningf("Error to update %s/%s status, tried %d times: %+v", namespace, name, try, err) } return err } diff --git a/pkg/globalmanager/runtime/secret_injector.go b/pkg/globalmanager/runtime/secret_injector.go index 4386a034d..8c986f419 100644 --- a/pkg/globalmanager/runtime/secret_injector.go +++ b/pkg/globalmanager/runtime/secret_injector.go @@ -121,7 +121,6 @@ func InjectSecretAnnotations(client kubernetes.Interface, obj CommonInterface, s } func injectSecretObj(obj CommonInterface, secret *v1.Secret) (err error) { - secretData := secret.GetAnnotations() for k, v := range secret.Data { diff --git a/pkg/globalmanager/runtime/types.go b/pkg/globalmanager/runtime/types.go index ebbcf61f6..4a2c075d7 100644 --- a/pkg/globalmanager/runtime/types.go +++ b/pkg/globalmanager/runtime/types.go @@ -17,6 +17,8 @@ limitations under the License. package runtime import ( + "time" + "github.com/kubeedge/sedna/pkg/globalmanager/config" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" @@ -29,31 +31,12 @@ import ( sednainformers "github.com/kubeedge/sedna/pkg/client/informers/externalversions" ) -// CommonInterface describes the commom interface of CRs -type CommonInterface interface { - metav1.Object - schema.ObjectKind - k8sruntime.Object -} - -// BaseControllerI defines the interface of an controller -type BaseControllerI interface { - Run(stopCh <-chan struct{}) -} - -// FeatureControllerI defines the interface of an AI Feature controller -type FeatureControllerI interface { - BaseControllerI - SetDownstreamSendFunc(f DownstreamSendFunc) error -} - -type Model struct { - Format string `json:"format,omitempty"` - URL string `json:"url,omitempty"` - Metrics map[string]interface{} `json:"metrics,omitempty"` -} - const ( + // DefaultBackOff is the default backoff period + DefaultBackOff = 10 * time.Second + // MaxBackOff is the max backoff period + MaxBackOff = 360 * time.Second + // TrainPodType is type of train pod TrainPodType = "train" // EvalPodType is type of eval pod @@ -65,24 +48,52 @@ const ( AnnotationsKeyPrefix = "sedna.io/" ) +type Model struct { + Format string `json:"format,omitempty"` + URL string `json:"url,omitempty"` + Metrics map[string]interface{} `json:"metrics,omitempty"` +} + func (m *Model) GetURL() string { return m.URL } -// updateHandler handles the updates from LC(running at edge) to update the -// corresponding resource -type UpstreamUpdateHandler func(namespace, name, operation string, content []byte) error - -type UpstreamControllerI interface { - BaseControllerI - Add(kind string, updateHandler UpstreamUpdateHandler) error +// CommonInterface describes the commom interface of CRs +type CommonInterface interface { + metav1.Object + schema.ObjectKind + k8sruntime.Object } +// UpstreamHandler is the function definition for handling the upstream updates, +// i.e. resource updates(mainly status) from LC(running at edge) +type UpstreamHandler = func(namespace, name, operation string, content []byte) error + +// UpstreamHandlerAddFunc defines the upstream controller register function for adding handler +type UpstreamHandlerAddFunc = func(kind string, updateHandler UpstreamHandler) error + +// DownstreamSendFunc is the send function for feature controllers to sync the resource updates(spec and status) to LC type DownstreamSendFunc = func(nodeName string, eventType watch.EventType, obj interface{}) error +// BaseControllerI defines the interface of an controller +type BaseControllerI interface { + Run(stopCh <-chan struct{}) +} + +// FeatureControllerI defines the interface of an AI Feature controller +type FeatureControllerI interface { + BaseControllerI + + // SetDownstreamSendFunc sets up the downstream send function in the feature controller + SetDownstreamSendFunc(f DownstreamSendFunc) error + + // SetUpstreamHandler sets up the upstream handler function for the feature controller + SetUpstreamHandler(add UpstreamHandlerAddFunc) error +} + +// ControllerContext defines the context that all feature controller share and belong to type ControllerContext struct { - Config *config.ControllerConfig - UpstreamController UpstreamControllerI + Config *config.ControllerConfig KubeClient kubernetes.Interface KubeInformerFactory kubeinformers.SharedInformerFactory diff --git a/pkg/globalmanager/runtime/worker.go b/pkg/globalmanager/runtime/worker.go index fab3dd139..df7208f41 100644 --- a/pkg/globalmanager/runtime/worker.go +++ b/pkg/globalmanager/runtime/worker.go @@ -105,7 +105,7 @@ func CreateKubernetesService(kubeClient kubernetes.Interface, object CommonInter return service.Spec.Ports[0].NodePort, nil } -// injectWorkerParam.Modifies pod in-place +// injectWorkerParam modifies pod in-place func injectWorkerParam(pod *v1.Pod, workerParam *WorkerParam, object CommonInterface) { InjectStorageInitializer(pod, workerParam)