Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gm: decouple all features into independent package #134

Merged
merged 7 commits into from
Aug 3, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pkg/globalmanager/controllers/dataset/dataset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package dataset

import (
"context"
"encoding/json"

sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
"github.com/kubeedge/sedna/pkg/globalmanager/config"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

sednaclientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
sednav1listers "github.com/kubeedge/sedna/pkg/client/listers/sedna/v1alpha1"
"github.com/kubeedge/sedna/pkg/globalmanager/runtime"
)

// Controller handles all dataset objects including: syncing to edge and update from edge.
type Controller struct {
client sednaclientset.SednaV1alpha1Interface

storeSynced cache.InformerSynced

// A store of dataset
lister sednav1listers.DatasetLister

cfg *config.ControllerConfig
}

// updateDatasetFromEdge syncs update from edge
func (c *Controller) updateDatasetFromEdge(name, namespace, operation string, content []byte) error {
status := sednav1.DatasetStatus{}
err := json.Unmarshal(content, &status)
if err != nil {
return err
}

return c.updateDatasetStatus(name, namespace, status)
}

// updateDatasetStatus updates the dataset status
func (c *Controller) updateDatasetStatus(name, namespace string, status sednav1.DatasetStatus) error {
client := c.client.Datasets(namespace)

if status.UpdateTime == nil {
now := metav1.Now()
status.UpdateTime = &now
}

return runtime.RetryUpdateStatus(name, namespace, func() error {
dataset, err := client.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
dataset.Status = status
_, err = client.UpdateStatus(context.TODO(), dataset, metav1.UpdateOptions{})
return err
})
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ package federatedlearning

import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
@@ -51,6 +52,9 @@ import (
)

const (
// KindName is the kind name of CR this controller controls
KindName = "FederatedLearningJob"
// Name is this controller name
Name = "FederatedLearning"
)

@@ -60,7 +64,7 @@ const (
)

// Kind contains the schema.GroupVersionKind for this controller type.
var Kind = sednav1.SchemeGroupVersion.WithKind("FederatedLearningJob")
var Kind = sednav1.SchemeGroupVersion.WithKind(KindName)

// Controller ensures that all FLJob objects have corresponding pods to
// run their configured workload.
@@ -531,9 +535,102 @@ func (c *Controller) createPod(job *sednav1.FederatedLearningJob) (active int32,
return
}

func (c *Controller) updateModelMetrics(jobName, namespace string, metrics []sednav1.Metric) error {
var err error
job, err := c.client.FederatedLearningJobs(namespace).Get(context.TODO(), jobName, metav1.GetOptions{})
if err != nil {
// federated crd not found
return err
}
modelName := job.Spec.AggregationWorker.Model.Name
client := c.client.Models(namespace)

return runtime.RetryUpdateStatus(modelName, namespace, (func() error {
model, err := client.Get(context.TODO(), modelName, metav1.GetOptions{})
if err != nil {
return err
}

now := metav1.Now()
model.Status.UpdateTime = &now
model.Status.Metrics = metrics
_, err = client.UpdateStatus(context.TODO(), model, metav1.UpdateOptions{})
return err
}))
}

func (c *Controller) appendStatusCondition(name, namespace string, cond sednav1.FLJobCondition) error {
client := c.client.FederatedLearningJobs(namespace)

return runtime.RetryUpdateStatus(name, namespace, (func() error {
job, err := client.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
job.Status.Conditions = append(job.Status.Conditions, cond)
_, err = client.UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
return err
}))
}

// updateFromEdge updates the federated job's status
func (c *Controller) updateFromEdge(name, namespace, operation string, content []byte) (err error) {
// JobInfo defines the job information
type JobInfo struct {
// Current training round
CurrentRound int `json:"currentRound"`
UpdateTime string `json:"updateTime"`
}

// Output defines job output information
type Output struct {
Models []runtime.Model `json:"models"`
JobInfo *JobInfo `json:"ownerInfo"`
}

var status struct {
Phase string `json:"phase"`
Status string `json:"status"`
Output *Output `json:"output"`
}

err = json.Unmarshal(content, &status)
if err != nil {
return
}

output := status.Output

if output != nil {
// Update the model's metrics
if len(output.Models) > 0 {
// only one model
model := output.Models[0]
metrics := runtime.ConvertMapToMetrics(model.Metrics)
if len(metrics) > 0 {
c.updateModelMetrics(name, namespace, metrics)
}
}

jobInfo := output.JobInfo
// update job info if having any info
if jobInfo != nil && jobInfo.CurrentRound > 0 {
// Find a good place to save the progress info
// TODO: more meaningful reason/message
reason := "DoTraining"
message := fmt.Sprintf("Round %v reaches at %s", jobInfo.CurrentRound, jobInfo.UpdateTime)
cond := NewFLJobCondition(sednav1.FLJobCondTraining, reason, message)
c.appendStatusCondition(name, namespace, cond)
}
}

return nil
}

// New creates a new federated learning job controller that keeps the relevant pods
// in sync with their corresponding FederatedLearningJob objects.
func New(cfg *config.ControllerConfig) (runtime.FeatureControllerI, error) {
func New(controllerContext *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
cfg := controllerContext.Config
namespace := cfg.Namespace
if namespace == "" {
namespace = metav1.NamespaceAll
@@ -585,5 +682,8 @@ func New(cfg *config.ControllerConfig) (runtime.FeatureControllerI, error) {
stopCh := make(chan struct{})
kubeInformerFactory.Start(stopCh)
jobInformerFactory.Start(stopCh)

controllerContext.UpstreamController.Add(KindName, fc.updateFromEdge)

return fc, err
}
Original file line number Diff line number Diff line change
@@ -53,11 +53,15 @@ import (
)

const (
Name = "IncrementalLearningJob"
// KindName is the kind name of CR this controller controls
KindName = "IncrementalLearningJob"

// Name is this controller name
Name = "IncrementalLearning"
)

// Kind contains the schema.GroupVersionKind for this controller type.
var Kind = sednav1.SchemeGroupVersion.WithKind("IncrementalLearningJob")
var Kind = sednav1.SchemeGroupVersion.WithKind(KindName)

// Controller ensures that all IncrementalLearningJob objects have corresponding pods to
// run their configured workload.
@@ -815,9 +819,83 @@ func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error {
return err
}

func (c *Controller) appendStatusCondition(name, namespace string, cond sednav1.ILJobCondition) error {
Copy link

@ugvddm ugvddm Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to extract function appendStatusCondition as a common funtion? please check other function , which seem like have same issue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but it's not trival since different CRs have different object clients which is hard to merge as one client, and it is not problem of this pr. So leave it alone.

client := c.client.IncrementalLearningJobs(namespace)
return runtime.RetryUpdateStatus(name, namespace, (func() error {
job, err := client.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
job.Status.Conditions = append(job.Status.Conditions, cond)
_, err = client.UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})
return err
}))
}

// updateFromEdge syncs the edge updates to k8s
func (c *Controller) updateFromEdge(name, namespace, operation string, content []byte) error {
var jobStatus struct {
Phase string `json:"phase"`
Status string `json:"status"`
}

err := json.Unmarshal(content, &jobStatus)
if err != nil {
return err
}

// Get the condition data.
// Here unmarshal and marshal immediately to skip the unnecessary fields
var condData runtime.IncrementalCondData
err = json.Unmarshal(content, &condData)
if err != nil {
return err
}
condDataBytes, _ := json.Marshal(&condData)

cond := sednav1.ILJobCondition{
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Data: string(condDataBytes),
Message: "reported by lc",
}

switch strings.ToLower(jobStatus.Phase) {
case "train":
cond.Stage = sednav1.ILJobTrain
case "eval":
cond.Stage = sednav1.ILJobEval
case "deploy":
cond.Stage = sednav1.ILJobDeploy
default:
return fmt.Errorf("invalid condition stage: %v", jobStatus.Phase)
}

switch strings.ToLower(jobStatus.Status) {
case "ready":
cond.Type = sednav1.ILJobStageCondReady
case "completed":
cond.Type = sednav1.ILJobStageCondCompleted
case "failed":
cond.Type = sednav1.ILJobStageCondFailed
case "waiting":
cond.Type = sednav1.ILJobStageCondWaiting
default:
return fmt.Errorf("invalid condition type: %v", jobStatus.Status)
}

err = c.appendStatusCondition(name, namespace, cond)
if err != nil {
return fmt.Errorf("failed to append condition, err:%+w", err)
}
return nil
}

// New creates a new IncrementalJob controller that keeps the relevant pods
// in sync with their corresponding IncrementalJob objects.
func New(cfg *config.ControllerConfig) (runtime.FeatureControllerI, error) {
func New(controllerContext *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
cfg := controllerContext.Config
namespace := cfg.Namespace
if namespace == "" {
namespace = metav1.NamespaceAll
@@ -880,5 +958,8 @@ func New(cfg *config.ControllerConfig) (runtime.FeatureControllerI, error) {
stopCh := make(chan struct{})
kubeInformerFactory.Start(stopCh)
jobInformerFactory.Start(stopCh)

controllerContext.UpstreamController.Add(KindName, jc.updateFromEdge)

return jc, err
}
Loading