diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 736a7b073..e9a403fc1 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -21,7 +21,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - name: Checkout code uses: actions/checkout@v2 @@ -57,7 +57,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -78,7 +78,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -102,7 +102,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -138,7 +138,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -159,7 +159,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -171,4 +171,4 @@ jobs: with: fetch-depth: 0 - - run: make docker-cross-build \ No newline at end of file + - run: make docker-cross-build diff --git a/build/gm/Dockerfile b/build/gm/Dockerfile index 5c1b03ce5..3b31fda9f 100644 --- a/build/gm/Dockerfile +++ b/build/gm/Dockerfile @@ -14,7 +14,7 @@ # Add cross buildx improvement # _speed_buildx_for_go_ -FROM golang:1.14-alpine3.11 AS builder +FROM golang:1.16-alpine3.15 AS builder LABEL stage=builder ARG GO_LDFLAGS diff --git a/build/lc/Dockerfile b/build/lc/Dockerfile index e96c429da..71b4f7aae 100644 --- a/build/lc/Dockerfile +++ b/build/lc/Dockerfile @@ -15,7 +15,7 @@ # Add cross buildx improvement # LC has built sqlite3 which requires CGO with CGO_ENABLED=1 # _speed_buildx_for_cgo_alpine_ -FROM golang:1.14-alpine3.11 AS builder +FROM golang:1.16-alpine3.15 AS builder LABEL stage=builder ARG GO_LDFLAGS diff --git a/docs/api/lib/index.rst b/docs/api/lib/index.rst index 15569e51d..d0628f67c 100644 --- a/docs/api/lib/index.rst +++ b/docs/api/lib/index.rst @@ -1,5 +1,5 @@ =========================================== -Sedna Python SDK +Python API Use Guide =========================================== .. mdinclude:: ../../../lib/sedna/README.md \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index a932286f8..8808b56bb 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -58,7 +58,7 @@ # -- Project information ----------------------------------------------------- project = 'Sedna' -copyright = '2020, Kubeedge' +copyright = '2021, Kubeedge' author = 'Kubeedge' version = __version__ diff --git a/docs/index.rst b/docs/index.rst index b9739ff71..6def8ac57 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,11 +9,24 @@ Sedna is an edge-cloud synergy AI project incubated in KubeEdge SIG AI. Benefiti Sedna can simply enable edge-cloud synergy capabilities to existing training and inference scripts, bringing the benefits of reducing costs, improving model performance, and protecting data privacy. + .. toctree:: :maxdepth: 1 - :caption: QUICK START + :caption: GUIDE - quickstart + index/guide + index/quickstart + + +.. toctree:: + :maxdepth: 1 + :titlesonly: + :glob: + :caption: DEPLOY + + Cluster Installation (for production) + AllinOne Installation (for development) + Standalone Installation (for hello world) .. toctree:: @@ -31,34 +44,28 @@ Sedna can simply enable edge-cloud synergy capabilities to existing training and proposals/object-tracking -.. toctree:: - :maxdepth: 1 - :titlesonly: - :glob: - :caption: DEPLOY - - Installtion - Standalone - .. toctree:: :maxdepth: 1 :glob: :caption: EXAMPLES - examples/federated_learning/surface_defect_detection/README - examples/incremental_learning/helmet_detection/README examples/joint_inference/helmet_detection_inference/README + examples/incremental_learning/helmet_detection/README + examples/federated_learning/surface_defect_detection/README + examples/federated_learning/yolov5_coco128_mistnet/README examples/lifelong_learning/atcii/README examples/storage/s3/* .. toctree:: :maxdepth: 1 - :caption: API + :caption: API REFERENCE :titlesonly: :glob: api/lib/* + Python API + .. toctree:: :maxdepth: 1 @@ -69,26 +76,17 @@ Sedna can simply enable edge-cloud synergy capabilities to existing training and Control Plane -.. toctree:: - :maxdepth: 1 - :caption: API REFERENCE - :titlesonly: - :glob: - - autoapi/lib/sedna/index - - .. toctree:: :caption: ROADMAP :hidden: - roadmap + index/roadmap RELATED LINKS ============= -.. mdinclude:: related_link.md +.. mdinclude:: index/related_link.md Indices and tables ================== diff --git a/docs/index/guide.md b/docs/index/guide.md new file mode 100644 index 000000000..155fc1f48 --- /dev/null +++ b/docs/index/guide.md @@ -0,0 +1,24 @@ +## Guide +- If you are new to Sedna, you can try the command step by step in [quick start](./quickstart.md). +- If you have played the above example, you can find more [examples](/examples/README.md). +- If you want to know more about sedna's architecture and component, you can find them in [sedna home]. +- If you're looking to contribute documentation improvements, you'll specifically want to see the [kubernetes documentation style guide] before [filing an issue][file-an-issue]. +- If you're planning to contribute code changes, you'll want to read the [development preparation guide] next. +- If you're planning to add a new synergy feature directly, you'll want to read the [guide][add-feature-guide] next. + + +[proposals]: /docs/proposals +[development preparation guide]: ../contributing/prepare-environment.md +[add-feature-guide]: ../contributing/control-plane/add-a-new-synergy-feature.md + +[sedna home]: https://github.com/kubeedge/sedna +[issues]: https://github.com/kubeedge/sedna/issues +[file-an-issue]: https://github.com/kubeedge/sedna/issues/new/choose +[file-a-fr]: https://github.com/kubeedge/sedna/issues/new?labels=kind%2Ffeature&template=enhancement.md + +[github]: https://github.com/ +[kubernetes documentation style guide]: https://github.com/kubernetes/community/blob/master/contributors/guide/style-guide.md +[recommended Git workflow]: https://github.com/kubernetes/community/blob/master/contributors/guide/github-workflow.md#workflow +[pull request best practices]: https://github.com/kubernetes/community/blob/master/contributors/guide/pull-requests.md#best-practices-for-faster-reviews +[Kubernetes help wanted]: https://www.kubernetes.dev/docs/guide/help-wanted/ + diff --git a/docs/setup/quick-start.md b/docs/index/quick-start.md similarity index 100% rename from docs/setup/quick-start.md rename to docs/index/quick-start.md diff --git a/docs/index/quickstart.md b/docs/index/quickstart.md new file mode 100644 index 000000000..e3c6c0cd5 --- /dev/null +++ b/docs/index/quickstart.md @@ -0,0 +1,183 @@ + +# Quick Start + + +The following is showing how to run a joint inference job by sedna. +## Quick Start + +#### 0. Check the Environment + +For Sedna all-in-one installation, it requires you: + - 1 VM **(one machine is OK, cluster is not required)** + - 2 CPUs or more + - 2GB+ free memory, depends on node number setting + - 10GB+ free disk space + - Internet connection(docker hub, github etc.) + - Linux platform, such as ubuntu/centos + - Docker 17.06+ + +you can check the docker version by the following command, +```bash +docker -v +``` +after doing that, the output will be like this, that means your version fits the bill. +``` +Docker version 19.03.6, build 369ce74a3c +``` + + +#### 1. Deploy Sedna +Sedna provides three deployment methods, which can be selected according to your actual situation: + +- [Install Sedna AllinOne](setup/all-in-one.md). (used for development, here we use it) +- [Install Sedna local up](setup/local-up.md). +- [Install Sedna on a cluster](setup/install.md). + +The [all-in-one script](/scripts/installation/all-in-one.sh) is used to install Sedna along with a mini Kubernetes environment locally, including: + - A Kubernetes v1.21 cluster with multi worker nodes, default zero worker node. + - KubeEdge with multi edge nodes, default is latest KubeEdge and one edge node. + - Sedna, default is the latest version. + + ```bash + curl https://raw.githubusercontent.com/kubeedge/sedna/master/scripts/installation/all-in-one.sh | NUM_EDGE_NODES=1 bash - + ``` + +Then you get two nodes `sedna-mini-control-plane` and `sedna-mini-edge0`,you can get into each node by following command: + +```bash +# get into cloud node +docker exec -it sedna-mini-control-plane bash +``` + +```bash +# get into edge node +docker exec -it sedna-mini-edge0 bash +``` + +#### 1. Prepare Data and Model File + +* step1: download [little model](https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/little-model.tar.gz) to your edge node. + +``` +mkdir -p /data/little-model +cd /data/little-model +wget https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/little-model.tar.gz +tar -zxvf little-model.tar.gz +``` + +* step2: download [big model](https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/big-model.tar.gz) to your cloud node. + +``` +mkdir -p /data/big-model +cd /data/big-model +wget https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/big-model.tar.gz +tar -zxvf big-model.tar.gz +``` + +#### 2. Create Big Model Resource Object for Cloud +In cloud node: +``` +kubectl create -f - <" - threshold: 500 - metric: num_of_samples - evalSpec: - template: - spec: - nodeName: $WORKER_NODE - containers: - - image: $IMAGE - name: eval-worker - imagePullPolicy: IfNotPresent - args: ["eval.py"] - env: - - name: "input_shape" - value: "352,640" - - name: "class_names" - value: "person,helmet,helmet-on,helmet-off" - deploySpec: - model: - name: "deploy-model" - hotUpdateEnabled: true - pollPeriodSeconds: 60 - trigger: - condition: - operator: ">" - threshold: 0.1 - metric: precision_delta - hardExampleMining: - name: "IBT" - parameters: - - key: "threshold_img" - value: "0.9" - - key: "threshold_box" - value: "0.9" - template: - spec: - nodeName: $WORKER_NODE - containers: - - image: $IMAGE - name: infer-worker - imagePullPolicy: IfNotPresent - args: ["inference.py"] - env: - - name: "input_shape" - value: "352,640" - - name: "video_url" - value: "file://video/video.mp4" - - name: "HE_SAVED_URL" - value: "/he_saved_url" - volumeMounts: - - name: localvideo - mountPath: /video/ - - name: hedir - mountPath: /he_saved_url - resources: # user defined resources - limits: - memory: 2Gi - volumes: # user defined volumes - - name: localvideo - hostPath: - path: /incremental_learning/video/ - type: DirectoryorCreate - - name: hedir - hostPath: - path: /incremental_learning/he/ - type: DirectoryorCreate - outputDir: "/output" -EOF -``` - -1. The `Dataset` describes data with labels and `HE_SAVED_URL` indicates the address of the deploy container for uploading hard examples. Users will mark label for the hard examples in the address. -2. Ensure that the path of outputDir in the YAML file exists on your node. This path will be directly mounted to the container. - - -### 5. Monitor - -### Check Incremental Learning Job -Query the service status: - -``` -kubectl get incrementallearningjob helmet-detection-demo -``` - -In the `IncrementalLearningJob` resource helmet-detection-demo, the following trigger is configured: - -``` -trigger: - checkPeriodSeconds: 60 - timer: - start: 02:00 - end: 20:00 - condition: - operator: ">" - threshold: 500 - metric: num_of_samples -``` - -## API - -- control-plane: Please refer to this [link](api/crd). -- Lib: Please refer to this [link](api/lib). - -## Contributing - -Contributions are very welcome! - -- control-plane: Please refer to this [link](contributing/control-plane/development.md). -- Lib: Please refer to this [link](contributing/lib/development.md). - -## Community - -Sedna is an open source project and in the spirit of openness and freedom, we welcome new contributors to join us. -You can get in touch with the community according to the ways: -* [Github Issues](https://github.com/kubeedge/sedna/issues) -* [Regular Community Meeting](https://zoom.us/j/4167237304) -* [slack channel](https://app.slack.com/client/TDZ5TGXQW/C01EG84REVB/details) - diff --git a/docs/related_link.md b/docs/related_link.md deleted file mode 100644 index 5e8e07565..000000000 --- a/docs/related_link.md +++ /dev/null @@ -1,9 +0,0 @@ -[支持边云协同终身学习特性,KubeEdge 子项目 Sedna 0.3.0 版本发布!](https://juejin.cn/post/6970866022286884878) - -[【HDC.Cloud 2021】边云协同,打通AI最后一公里](https://xie.infoq.cn/article/b22e72afe8de50ca34269bb21) - -[KubeEdge Sedna如何实现边缘AI模型精度提升50%](https://www.huaweicloud.com/zhishi/hdc2021-Track-24-18.html) - -[KubeEdge子项目Sedna 0.1版本发布!支持边云协同增量学习、联邦学习、协同推理](https://mp.weixin.qq.com/s/3Ei8ynSAxnfuoIWYdb7Gpg) - -[加速AI边云协同创新!KubeEdge社区建立Sedna子项目](https://cloud.tencent.com/developer/article/1791739) diff --git a/docs/setup/install.md b/docs/setup/install.md index 19607f313..f5c1feb38 100644 --- a/docs/setup/install.md +++ b/docs/setup/install.md @@ -4,7 +4,7 @@ For interested readers, Sedna also has two important components that would be me If you don't have an existing Kubernetes, you can: 1) Install Kubernetes by following the [Kubernetes website](https://kubernetes.io/docs/setup/). -2) Or follow [quick start](quick-start.md) for other options. +2) Or follow [quick start](index/quick-start.md) for other options. ### Prerequisites - [Kubectl][kubectl] with right kubeconfig diff --git a/examples/incremental_learning/helmet_detection/helmet_detection.yaml b/examples/incremental_learning/helmet_detection/helmet_detection.yaml new file mode 100644 index 000000000..2561be3a8 --- /dev/null +++ b/examples/incremental_learning/helmet_detection/helmet_detection.yaml @@ -0,0 +1,105 @@ +apiVersion: sedna.io/v1alpha1 +kind: IncrementalLearningJob +metadata: + name: helmet-detection-demo +spec: + initialModel: + name: "initial-model" + dataset: + name: "incremental-dataset" + trainProb: 0.8 + trainSpec: + template: + spec: + nodeName: $WORKER_NODE + containers: + - image: $IMAGE + name: train-worker + imagePullPolicy: IfNotPresent + args: [ "train.py" ] + env: + - name: "batch_size" + value: "32" + - name: "epochs" + value: "1" + - name: "input_shape" + value: "352,640" + - name: "class_names" + value: "person,helmet,helmet-on,helmet-off" + - name: "nms_threshold" + value: "0.4" + - name: "obj_threshold" + value: "0.3" + trigger: + checkPeriodSeconds: 60 + timer: + start: 02:00 + end: 20:00 + condition: + operator: ">" + threshold: 500 + metric: num_of_samples + evalSpec: + template: + spec: + nodeName: $WORKER_NODE + containers: + - image: $IMAGE + name: eval-worker + imagePullPolicy: IfNotPresent + args: [ "eval.py" ] + env: + - name: "input_shape" + value: "352,640" + - name: "class_names" + value: "person,helmet,helmet-on,helmet-off" + deploySpec: + model: + name: "deploy-model" + hotUpdateEnabled: true + pollPeriodSeconds: 60 + trigger: + condition: + operator: ">" + threshold: 0.1 + metric: precision_delta + hardExampleMining: + name: "IBT" + parameters: + - key: "threshold_img" + value: "0.9" + - key: "threshold_box" + value: "0.9" + template: + spec: + nodeName: $WORKER_NODE + containers: + - image: $IMAGE + name: infer-worker + imagePullPolicy: IfNotPresent + args: [ "inference.py" ] + env: + - name: "input_shape" + value: "352,640" + - name: "video_url" + value: "file://video/video.mp4" + - name: "HE_SAVED_URL" + value: "/he_saved_url" + volumeMounts: + - name: localvideo + mountPath: /video/ + - name: hedir + mountPath: /he_saved_url + resources: # user defined resources + limits: + memory: 2Gi + volumes: # user defined volumes + - name: localvideo + hostPath: + path: /incremental_learning/video/ + type: DirectoryorCreate + - name: hedir + hostPath: + path: /incremental_learning/he/ + type: DirectoryorCreate + outputDir: "/output" \ No newline at end of file diff --git a/examples/joint_inference/helmet_detection_inference/helmet_detection_inference.yaml b/examples/joint_inference/helmet_detection_inference/helmet_detection_inference.yaml new file mode 100644 index 000000000..9ecb44301 --- /dev/null +++ b/examples/joint_inference/helmet_detection_inference/helmet_detection_inference.yaml @@ -0,0 +1,66 @@ +apiVersion: sedna.io/v1alpha1 +kind: JointInferenceService +metadata: + name: helmet-detection-inference-example + namespace: default +spec: + edgeWorker: + model: + name: "helmet-detection-inference-little-model" + hardExampleMining: + name: "IBT" + parameters: + - key: "threshold_img" + value: "0.9" + - key: "threshold_box" + value: "0.9" + template: + spec: + nodeName: $EDGE_NODE + containers: + - image: kubeedge/sedna-example-joint-inference-helmet-detection-little:v0.3.0 + imagePullPolicy: IfNotPresent + name: little-model + env: # user defined environments + - name: input_shape + value: "416,736" + - name: "video_url" + value: "rtsp://localhost/video" + - name: "all_examples_inference_output" + value: "/data/output" + - name: "hard_example_cloud_inference_output" + value: "/data/hard_example_cloud_inference_output" + - name: "hard_example_edge_inference_output" + value: "/data/hard_example_edge_inference_output" + resources: # user defined resources + requests: + memory: 64M + cpu: 100m + limits: + memory: 2Gi + volumeMounts: + - name: outputdir + mountPath: /data/ + volumes: # user defined volumes + - name: outputdir + hostPath: + # user must create the directory in host + path: /joint_inference/output + type: Directory + + cloudWorker: + model: + name: "helmet-detection-inference-big-model" + template: + spec: + nodeName: $CLOUD_NODE + containers: + - image: kubeedge/sedna-example-joint-inference-helmet-detection-big:v0.3.0 + name: big-model + imagePullPolicy: IfNotPresent + env: # user defined environments + - name: "input_shape" + value: "544,544" + resources: # user defined resources + requests: + memory: 2Gi diff --git a/go.mod b/go.mod index 2f41d6c5d..a1db6d821 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/kubeedge/sedna -go 1.14 +go 1.16 require ( github.com/emicklei/go-restful/v3 v3.4.0 diff --git a/pkg/globalmanager/controllers/incrementallearning/downstream.go b/pkg/globalmanager/controllers/incrementallearning/downstream.go index 84be05287..d323ff602 100644 --- a/pkg/globalmanager/controllers/incrementallearning/downstream.go +++ b/pkg/globalmanager/controllers/incrementallearning/downstream.go @@ -62,18 +62,15 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro // more details at https://github.com/kubernetes/kubernetes/issues/3030 job.Kind = KindName - jobConditions := job.Status.Conditions - if len(jobConditions) == 0 { - return nil - } - dataName := job.Spec.Dataset.Name + // LC has dataset object on this node that may call dataset node + var dsNodeName string ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("dataset(%s/%s) not found", job.Namespace, dataName) + klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err) + } else { + dsNodeName = ds.Spec.NodeName } - // LC has dataset object on this node that may call dataset node - dsNodeName := ds.Spec.NodeName var trainNodeName string var evalNodeName string @@ -102,6 +99,15 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro return nil } + if dsNodeName == "" { + return nil + } + + jobConditions := job.Status.Conditions + if len(jobConditions) == 0 { + return nil + } + latestCondition := jobConditions[len(jobConditions)-1] currentType := latestCondition.Type jobStage := latestCondition.Stage diff --git a/pkg/globalmanager/controllers/lifelonglearning/downstream.go b/pkg/globalmanager/controllers/lifelonglearning/downstream.go index 652bc79b1..eee7a306c 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/downstream.go +++ b/pkg/globalmanager/controllers/lifelonglearning/downstream.go @@ -17,9 +17,12 @@ limitations under the License. package lifelonglearning import ( - "fmt" + "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/globalmanager/runtime" @@ -36,17 +39,95 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro // more details at https://github.com/kubernetes/kubernetes/issues/3030 job.Kind = KindName - // Here only propagate to the nodes with non empty name + dataName := job.Spec.Dataset.Name + // LC has dataset object on this node that may call dataset node + var dsNodeName string + ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err) + } else { + dsNodeName = ds.Spec.NodeName + } + + var trainNodeName string + var evalNodeName string + var deployNodeName string - // FIXME(llhuii): only the case that all workers having the same nodeName are support, - // will support Spec.NodeSelector and different nodeName. - nodeName := job.Spec.TrainSpec.Template.Spec.NodeName - if len(nodeName) == 0 { - return fmt.Errorf("empty node name") + getAnnotationsNodeName := func(nodeName sednav1.LLJobStage) string { + return runtime.AnnotationsKeyPrefix + string(nodeName) + } + ann := job.GetAnnotations() + if ann != nil { + trainNodeName = ann[getAnnotationsNodeName(sednav1.LLJobTrain)] + evalNodeName = ann[getAnnotationsNodeName(sednav1.LLJobEval)] + deployNodeName = ann[getAnnotationsNodeName(sednav1.LLJobDeploy)] + } + + if eventType == watch.Deleted { + // delete jobs from all LCs + nodes := sets.NewString(dsNodeName, trainNodeName, evalNodeName, deployNodeName) + + for node := range nodes { + c.sendToEdgeFunc(node, eventType, job) + } + + return nil + } + + if dsNodeName == "" { + return nil + } + + jobConditions := job.Status.Conditions + if len(jobConditions) == 0 { + return nil + } + + latestCondition := jobConditions[len(jobConditions)-1] + currentType := latestCondition.Type + jobStage := latestCondition.Stage + + syncJobWithNodeName := func(nodeName string) { + if err := c.sendToEdgeFunc(nodeName, eventType, job); err != nil { + klog.Warningf("Error to sync lifelong learning job %s to node %s in stage %s: %v", + job.Name, nodeName, jobStage, err) + } } runtime.InjectSecretAnnotations(c.kubeClient, job, job.Spec.CredentialName) - return c.sendToEdgeFunc(nodeName, eventType, job) + + // isJobResidentNode checks whether nodeName is a job resident node + isJobResidentNode := func(nodeName string) bool { + // the node where LC monitors dataset and the node where inference worker is running are job resident node + if nodeName == dsNodeName || nodeName == deployNodeName { + return true + } + return false + } + + doJobStageEvent := func(nodeName string) { + if currentType == sednav1.LLJobStageCondWaiting { + syncJobWithNodeName(dsNodeName) + } else if currentType == sednav1.LLJobStageCondRunning { + syncJobWithNodeName(nodeName) + } else if currentType == sednav1.LLJobStageCondCompleted || currentType == sednav1.LLJobStageCondFailed { + if !isJobResidentNode(nodeName) { + // delete LC's job from nodeName that's different from dataset node when worker's status is completed or failed. + c.sendToEdgeFunc(nodeName, watch.Deleted, job) + } + } + } + + switch jobStage { + case sednav1.LLJobTrain: + doJobStageEvent(trainNodeName) + case sednav1.LLJobEval: + doJobStageEvent(evalNodeName) + case sednav1.LLJobDeploy: + doJobStageEvent(deployNodeName) + } + + return nil } func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error { diff --git a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go index 4c3a2f690..c4e8c32ff 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go @@ -18,13 +18,17 @@ package lifelonglearning import ( "context" + "crypto/sha256" + "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/types" "strings" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + lruexpirecache "k8s.io/apimachinery/pkg/util/cache" utilrand "k8s.io/apimachinery/pkg/util/rand" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -50,6 +54,8 @@ const ( KindName = "LifelongLearningJob" // Name is this controller name Name = "LifelongLearning" + // VirtualKubeletNode is virtual node + VirtualKubeletNode = "virtual-kubelet" ) // Kind contains the schema.GroupVersionKind for this controller type. @@ -80,6 +86,8 @@ type Controller struct { cfg *config.ControllerConfig sendToEdgeFunc runtime.DownstreamSendFunc + + lruExpireCache *lruexpirecache.LRUExpireCache } // Run starts the main goroutine responsible for watching and syncing jobs. @@ -294,10 +302,51 @@ func (c *Controller) sync(key string) (bool, error) { return forget, err } +// setWorkerNodeNameOfJob sets the worker nodeName of the specified job +// which is used for downstream to sync job info to the specified LC located in nodeName. +func (c *Controller) setWorkerNodeNameOfJob(job *sednav1.LifelongLearningJob, jobStage string, nodeName string) error { + key := runtime.AnnotationsKeyPrefix + jobStage + + return c.addJobAnnotations(job, key, nodeName) +} + +// addJobAnnotations adds info in job annotations +func (c *Controller) addJobAnnotations(job *sednav1.LifelongLearningJob, key string, value string) error { + ann := job.GetAnnotations() + if ann[key] == value { + // already set + return nil + } + + patchData := metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{key: value}}} + + patchDataBytes, err := json.Marshal(&patchData) + if err != nil { + return err + } + + jobClient := c.client.LifelongLearningJobs(job.Namespace) + return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error { + newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + annotations := newJob.GetAnnotations() + if annotations[key] == value { + return nil + } + + _, err = jobClient.Patch(context.TODO(), job.Name, types.MergePatchType, patchDataBytes, metav1.PatchOptions{}) + return err + }) +} + // transitJobState transit job to next state func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, error) { var initialType sednav1.LLJobStageConditionType - var latestCondition = sednav1.LLJobCondition{ + var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{ Stage: sednav1.LLJobTrain, Type: initialType, } @@ -305,14 +354,16 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er var newConditionType sednav1.LLJobStageConditionType var needUpdated = false - var podStatus = v1.PodUnknown + var podStatus v1.PodPhase = v1.PodUnknown + var pod *v1.Pod + jobConditions := job.Status.Conditions if len(jobConditions) > 0 { // get latest pod and pod status latestCondition = (jobConditions)[len(jobConditions)-1] klog.V(2).Infof("lifelonglearning job %v/%v latest stage %v:", job.Namespace, job.Name, latestCondition.Stage) - pod := c.getSpecifiedPods(job, string(latestCondition.Stage)) + pod = c.getSpecifiedPods(job, string(latestCondition.Stage)) if pod != nil { podStatus = pod.Status.Phase @@ -334,32 +385,36 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er // include train, eval, deploy pod var err error if jobStage == sednav1.LLJobDeploy { - err = c.restartInferPod(job) - if err != nil { - klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) - } else { + if !c.hasJobInCache(job) { + err = c.restartInferPod(job) + if err != nil { + klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) + return needUpdated, err + } + klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) + newConditionType = sednav1.LLJobStageCondCompleted + c.addJobToCache(job) } - } else if podStatus != v1.PodPending && podStatus != v1.PodRunning { - err = c.createPod(job, jobStage) - } - if err != nil { - return needUpdated, err + } else { + if podStatus != v1.PodPending && podStatus != v1.PodRunning { + err = c.createPod(job, jobStage) + if err != nil { + return needUpdated, err + } + } + newConditionType = sednav1.LLJobStageCondStarting } - newConditionType = sednav1.LLJobStageCondStarting case sednav1.LLJobStageCondStarting, sednav1.LLJobStageCondRunning: if podStatus == v1.PodRunning { - if jobStage == sednav1.LLJobDeploy { - newConditionType = sednav1.LLJobStageCondCompleted - } else { - // watch pod status, if pod running, set type running - newConditionType = sednav1.LLJobStageCondRunning + // add nodeName to job + if err := c.setWorkerNodeNameOfJob(job, string(jobStage), pod.Spec.NodeName); err != nil { + return needUpdated, err } - } else if podStatus == v1.PodSucceeded { - // watch pod status, if pod completed, set type completed - newConditionType = sednav1.LLJobStageCondCompleted - klog.V(2).Infof("lifelonglearning job %v/%v %v stage completed!", job.Namespace, job.Name, jobStage) + + // watch pod status, if pod running, set type running + newConditionType = sednav1.LLJobStageCondRunning } else if podStatus == v1.PodFailed { newConditionType = sednav1.LLJobStageCondFailed klog.V(2).Infof("lifelonglearning job %v/%v %v stage failed!", job.Namespace, job.Name, jobStage) @@ -441,6 +496,21 @@ func (c *Controller) getSpecifiedPods(job *sednav1.LifelongLearningJob, podType return latestPod } +func (c *Controller) getHas256(target interface{}) string { + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%v", target))) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func (c *Controller) addJobToCache(job *sednav1.LifelongLearningJob) { + c.lruExpireCache.Add(c.getHas256(job.Status), job, 10*time.Second) +} + +func (c *Controller) hasJobInCache(job *sednav1.LifelongLearningJob) bool { + _, ok := c.lruExpireCache.Get(c.getHas256(job.Status)) + return ok +} + func (c *Controller) restartInferPod(job *sednav1.LifelongLearningJob) error { inferPod := c.getSpecifiedPods(job, runtime.InferencePodType) if inferPod == nil { @@ -492,6 +562,18 @@ func IsJobFinished(j *sednav1.LifelongLearningJob) bool { return false } +func (c *Controller) addPodAnnotations(spec *v1.PodTemplateSpec, key string, value string) { + ann := spec.GetAnnotations() + if ann == nil { + ann = make(map[string]string) + } + + if _, ok := ann[key]; !ok { + ann[key] = value + spec.SetAnnotations(ann) + } +} + func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1.LLJobStage) (err error) { ctx := context.Background() var podTemplate *v1.PodTemplateSpec @@ -541,13 +623,21 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 originalDataURLOrIndex = dataset.Spec.URL } - var workerParam = new(runtime.WorkerParam) + var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) + if podtype == sednav1.LLJobTrain { - workerParam.WorkerType = "Train" + workerParam.WorkerType = runtime.TrainPodType podTemplate = &job.Spec.TrainSpec.Template // Env parameters for train + c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType) + c.addPodAnnotations(podTemplate, "data", dataURL) + datasetUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + datasetUseInitializer = false + } + workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -571,7 +661,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ URL: dataURL, Secret: jobSecret, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, EnvName: "TRAIN_DATASET_URL", }, @@ -582,14 +672,25 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 Secret: datasetSecret, URL: originalDataURLOrIndex, Indirect: dataset.Spec.URL != originalDataURLOrIndex, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, EnvName: "ORIGINAL_DATASET_URL", }, ) } else { podTemplate = &job.Spec.EvalSpec.Template - workerParam.WorkerType = "Eval" + workerParam.WorkerType = runtime.EvalPodType + + c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType) + c.addPodAnnotations(podTemplate, "data", dataURL) + datasetUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + datasetUseInitializer = false + } + modelUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + modelUseInitializer = false + } // Configure Env information for eval by initial WorkerParam workerParam.Env = map[string]string{ @@ -606,7 +707,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 modelMountURLs = append(modelMountURLs, runtime.MountURL{ URL: url, Secret: jobSecret, - DownloadByInitializer: true, + DownloadByInitializer: modelUseInitializer, }) } workerParam.Mounts = append(workerParam.Mounts, @@ -629,7 +730,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ URL: dataURL, Secret: datasetSecret, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, Name: "datasets", EnvName: "TEST_DATASET_URL", @@ -639,7 +740,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ Secret: datasetSecret, URL: originalDataURLOrIndex, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, Indirect: dataset.Spec.URL != originalDataURLOrIndex, }, Name: "origin-dataset", @@ -672,7 +773,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error { return err } - var workerParam = new(runtime.WorkerParam) + var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{ URL: &runtime.MountURL{ @@ -694,6 +795,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error { } workerParam.WorkerType = runtime.InferencePodType + c.addPodAnnotations(&job.Spec.DeploySpec.Template, "type", workerParam.WorkerType) workerParam.HostNetwork = true // create edge pod @@ -714,10 +816,11 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")}) jc := &Controller{ - kubeClient: cc.KubeClient, - client: cc.SednaClient.SednaV1alpha1(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), - cfg: cfg, + kubeClient: cc.KubeClient, + client: cc.SednaClient.SednaV1alpha1(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), + cfg: cfg, + lruExpireCache: lruexpirecache.NewLRUExpireCache(10), } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go index 114ccff0e..a5763fc6d 100644 --- a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go @@ -20,17 +20,21 @@ import ( "bufio" "encoding/json" "fmt" + "io/ioutil" "os" "path" + "path/filepath" "strconv" "strings" "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/kubeedge/sedna/cmd/sedna-lc/app/options" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" + gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/lifelonglearning" "github.com/kubeedge/sedna/pkg/globalmanager/runtime" "github.com/kubeedge/sedna/pkg/localcontroller/db" clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" @@ -42,20 +46,26 @@ import ( ) const ( + // JobIterationIntervalSeconds is interval time of each iteration of job + JobIterationIntervalSeconds = 10 + // DatasetHandlerIntervalSeconds is interval time of handling dataset + DatasetHandlerIntervalSeconds = 10 + // EvalSamplesCapacity is capacity of eval samples + EvalSamplesCapacity = 5 //KindName is kind of lifelong-learning-job resource KindName = "lifelonglearningjob" - // TrainPhase is the train phase - TrainPhase = "train" - // EvalPhase is the eval phase - EvalPhase = "eval" - // DeployPhase is the deploy phase - DeployPhase = "deploy" - // TriggerReadyStatus is the ready status about trigger TriggerReadyStatus = "ready" // TriggerCompletedStatus is the completed status about trigger TriggerCompletedStatus = "completed" + + AnnotationsRoundsKey = "sedna.io/rounds" + AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples" + AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval" + + // WorkerS3StatusHandlerIntervalSeconds is interval time of handling s3 status of worker + WorkerS3StatusHandlerIntervalSeconds = 30 ) // LifelongLearningJobManager defines lifelong-learning-job Manager @@ -70,58 +80,47 @@ type Manager struct { // LifelongLearningJob defines config for lifelong-learning-job type Job struct { sednav1.LifelongLearningJob - Dataset *dataset.Dataset - Done chan struct{} - Storage storage.Storage - JobConfig *LLJobConfig -} - -// LLJobConfig defines config for lifelong-learning-job -type LLJobConfig struct { - UniqueIdentifier string - Version int - Phase string - WorkerStatus string - TrainTrigger trigger.Base - TriggerStatus string - TriggerTime time.Time - TrainDataURL string - EvalDataURL string - OutputDir string - OutputConfig *LLOutputConfig - DataSamples *LLDataSamples - TrainModel *Model - DeployModel *Model - EvalResult *Model - Lock sync.Mutex + JobConfig *JobConfig +} + +// JobConfig defines config for lifelong-learning-job +type JobConfig struct { + UniqueIdentifier string + Rounds int + TrainTrigger trigger.Base + TriggerTime time.Time + TrainTriggerStatus string + EvalTriggerStatus string + DeployTriggerStatus string + TrainDataURL string + EvalDataURL string + OutputDir string + OutputConfig *OutputConfig + DataSamples *DataSamples + DeployModel *Model + Lock sync.Mutex + Dataset *dataset.Dataset + Storage storage.Storage + Done chan struct{} } type Model = clienttypes.Model -// LLOutputConfig defines config for job output -type LLOutputConfig struct { - SamplesOutput map[string]string - TrainOutput string - EvalOutput string +// OutputConfig defines config for job output +type OutputConfig struct { + SamplesOutput map[string]string `json:"trainData"` + TrainOutput string `json:"trainOutput"` + EvalOutput string `json:"evalOutput"` } -// LLDataSamples defines samples information -type LLDataSamples struct { - Numbers int +// DataSamples defines samples information +type DataSamples struct { + PreviousNumbers int TrainSamples []string EvalVersionSamples [][]string EvalSamples []string } -const ( - // LLJobIterationIntervalSeconds is interval time of each iteration of job - LLJobIterationIntervalSeconds = 10 - // LLHandlerDataIntervalSeconds is interval time of handling dataset - LLHandlerDataIntervalSeconds = 10 - // LLLLEvalSamplesCapacity is capacity of eval samples - LLEvalSamplesCapacity = 5 -) - // New creates a lifelong-learning-job manager func New(client clienttypes.ClientI, datasetManager *dataset.Manager, options *options.LocalControllerOptions) *Manager { lm := Manager{ @@ -143,8 +142,6 @@ func (lm *Manager) Insert(message *clienttypes.Message) error { job, ok := lm.LifelongLearningJobMap[name] if !ok { job = &Job{} - job.Storage = storage.Storage{IsLocalStorage: false} - job.Done = make(chan struct{}) lm.LifelongLearningJobMap[name] = job first = true } @@ -153,21 +150,14 @@ func (lm *Manager) Insert(message *clienttypes.Message) error { return err } - credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] - if credential != "" { - if err := job.Storage.SetCredential(credential); err != nil { - return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err) - } + if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil { + return err } if first { go lm.startJob(name) } - if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil { - return err - } - return nil } @@ -179,51 +169,43 @@ func (lm *Manager) startJob(name string) { return } - job.JobConfig = new(LLJobConfig) - jobConfig := job.JobConfig - jobConfig.UniqueIdentifier = name - - err = lm.initJob(job) + err = lm.initJob(job, name) if err != nil { - klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier) + klog.Errorf("failed to init job (name=%s): %+v", name) return } klog.Infof("lifelong learning job(name=%s) is started", name) defer klog.Infof("lifelong learning job(name=%s) is stopped", name) + + // handle data from dataset go lm.handleData(job) - tick := time.NewTicker(LLJobIterationIntervalSeconds * time.Second) + tick := time.NewTicker(JobIterationIntervalSeconds * time.Second) for { select { - case <-job.Done: + case <-job.JobConfig.Done: return default: } - if job.Dataset == nil { - klog.V(3).Infof("job(name=%s) dataset not ready", - jobConfig.UniqueIdentifier) + cond := lm.getLatestCondition(job) + jobStage := cond.Stage - <-tick.C - continue - } - - switch jobConfig.Phase { - case TrainPhase: + switch jobStage { + case sednav1.LLJobTrain: err = lm.trainTask(job) - case EvalPhase: + case sednav1.LLJobEval: err = lm.evalTask(job) - case DeployPhase: + case sednav1.LLJobDeploy: err = lm.deployTask(job) default: - klog.Errorf("invalid phase: %s", jobConfig.Phase) + klog.Errorf("invalid phase: %s", jobStage) continue } if err != nil { - klog.Errorf("job(name=%s) complete the %s task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) + klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err) } <-tick.C @@ -234,40 +216,58 @@ func (lm *Manager) startJob(name string) { func (lm *Manager) trainTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - payload, ok, err := lm.triggerTrainTask(job) - if !ok { - return nil - } + latestCond := lm.getLatestCondition(job) + jobStage := latestCond.Stage + currentType := latestCond.Type - if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) - return err + if currentType == sednav1.LLJobStageCondWaiting { + err := lm.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - err = lm.Client.WriteMessage(payload, job.getHeader()) - if err != nil { - klog.Errorf("job(name=%s) failed to write message: %v", - jobConfig.UniqueIdentifier, err) - return err + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier) } - jobConfig.TriggerStatus = TriggerCompletedStatus + initTriggerStatus(jobConfig) - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobConfig.Phase) - } + if jobConfig.TrainTriggerStatus == TriggerReadyStatus { + payload, ok, err := lm.triggerTrainTask(job) + if !ok { + return nil + } - if jobConfig.WorkerStatus == workertypes.FailedStatus { - klog.Warningf("found the %sing phase worker that ran failed, "+ - "back the training phase triggering task", jobConfig.Phase) - backLLTaskStatus(jobConfig) - } + if err != nil { + klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v", + jobConfig.UniqueIdentifier, jobStage, err) + job.JobConfig.Rounds-- + return err + } + + err = lm.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + job.JobConfig.Rounds-- + return err + } - if jobConfig.WorkerStatus == workertypes.CompletedStatus { - klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) - nextLLTask(jobConfig) + forwardSamples(jobConfig, jobStage) + + err = lm.saveJobToDB(job) + if err != nil { + klog.Errorf("job(%s) failed to save job to db: %v", + jobConfig.UniqueIdentifier, err) + // continue anyway + } + + go lm.monitorS3Worker(job, sednav1.LLJobTrain) + + jobConfig.TrainTriggerStatus = TriggerCompletedStatus + klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -277,35 +277,39 @@ func (lm *Manager) trainTask(job *Job) error { func (lm *Manager) evalTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - payload, err := lm.triggerEvalTask(job) - if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) - return err - } + latestCond := lm.getLatestCondition(job) + jobStage := latestCond.Stage + currentType := latestCond.Type - err = lm.Client.WriteMessage(payload, job.getHeader()) - if err != nil { - return err + if currentType == sednav1.LLJobStageCondWaiting { + err := lm.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - jobConfig.TriggerStatus = TriggerCompletedStatus + if jobConfig.EvalTriggerStatus == TriggerReadyStatus { + payload, err := lm.triggerEvalTask(job) + if err != nil { + klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v", + jobConfig.UniqueIdentifier, jobStage, err) + return err + } - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobConfig.Phase) - } + err = lm.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + return err + } - if jobConfig.WorkerStatus == workertypes.FailedStatus { - msg := fmt.Sprintf("job(name=%s) found the %sing phase worker that ran failed, "+ - "back the training phase triggering task", jobConfig.UniqueIdentifier, jobConfig.Phase) - klog.Errorf(msg) - return fmt.Errorf(msg) - } + forwardSamples(jobConfig, jobStage) + + go lm.monitorS3Worker(job, sednav1.LLJobEval) - if jobConfig.WorkerStatus == workertypes.CompletedStatus { - klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) - nextLLTask(jobConfig) + jobConfig.EvalTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -313,39 +317,37 @@ func (lm *Manager) evalTask(job *Job) error { // deployTask starts deploy task func (lm *Manager) deployTask(job *Job) error { - jobConfig := job.JobConfig + if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus { + jobConfig := job.JobConfig + var err error + + status := clienttypes.UpstreamMessage{Phase: string(sednav1.LLJobDeploy)} + models := lm.getJobStageModel(job, sednav1.LLJobDeploy) + if models != nil { + err = lm.updateDeployModelFile(job, models[0].URL, jobConfig.DeployModel.URL) + if err != nil { + status.Status = string(sednav1.LLJobStageCondFailed) + klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err) + return err + } - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - status := clienttypes.UpstreamMessage{} - status.Phase = DeployPhase - deployModel, err := lm.deployModel(job) - if err != nil { - klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err) + status.Status = string(sednav1.LLJobStageCondReady) + status.Input = &clienttypes.Input{Models: []Model{{Format: models[0].Format, URL: models[0].URL}}} } else { - klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier) - } - if err != nil || deployModel == nil { - status.Status = workertypes.FailedStatus - } else { - status.Status = workertypes.ReadyStatus - status.Input = &clienttypes.Input{ - Models: []Model{ - *deployModel, - }, - } + klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier) + status.Status = string(sednav1.LLJobStageCondCompleted) } - if err = lm.Client.WriteMessage(status, job.getHeader()); err != nil { + err = lm.Client.WriteMessage(status, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) completed the %s task failed: %v", + jobConfig.UniqueIdentifier, sednav1.LLJobDeploy, err) return err } - jobConfig.TriggerStatus = TriggerCompletedStatus + job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.LLJobDeploy) } - - nextLLTask(jobConfig) - - klog.Infof("job(name=%s) complete the deploy task successfully", jobConfig.UniqueIdentifier) - return nil } @@ -365,19 +367,22 @@ func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { return nil, false, nil } - jobConfig.Version++ + job.JobConfig.Rounds++ + rounds := jobConfig.Rounds var dataIndexURL string - jobConfig.TrainDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.TrainSamples, - jobConfig.OutputConfig.SamplesOutput["train"]) + jobConfig.TrainDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.TrainSamples, + jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("train phase: write samples to the file(%s) is failed, error: %v", jobConfig.TrainDataURL, err) + job.JobConfig.Rounds-- + klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v", + jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err) return nil, false, err } dataURL := jobConfig.TrainDataURL - outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version)}, "/") - if job.Storage.IsLocalStorage { + outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/") + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) @@ -389,10 +394,11 @@ func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { OutputDir: outputDir, } msg := clienttypes.UpstreamMessage{ - Phase: TrainPhase, - Status: workertypes.ReadyStatus, + Phase: string(sednav1.LLJobTrain), + Status: string(sednav1.LLJobStageCondReady), Input: &input, } + jobConfig.TriggerTime = time.Now() return &msg, true, nil } @@ -402,89 +408,83 @@ func (lm *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro jobConfig := job.JobConfig var err error + latestCondition := lm.getLatestCondition(job) + + ms := lm.getJobStageModel(job, latestCondition.Stage) + if ms == nil { + return nil, err + } + var dataIndexURL string - jobConfig.EvalDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"]) + jobConfig.EvalDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"], + job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v", + klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v", jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err) return nil, err } - var models []Model - models = append(models, Model{ - Format: jobConfig.TrainModel.Format, - URL: jobConfig.TrainModel.URL, - }) - dataURL := jobConfig.EvalDataURL - outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version)}, "/") - if job.Storage.IsLocalStorage { + outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds)}, "/") + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) } input := clienttypes.Input{ - Models: models, + Models: ms, DataURL: dataURL, DataIndexURL: dataIndexURL, OutputDir: outputDir, } msg := &clienttypes.UpstreamMessage{ - Phase: EvalPhase, - Status: workertypes.ReadyStatus, + Phase: string(sednav1.LLJobEval), + Status: string(sednav1.LLJobStageCondReady), Input: &input, } return msg, nil } -// deployModel deploys model -func (lm *Manager) deployModel(job *Job) (*Model, error) { - jobConfig := job.JobConfig - - model := &Model{} - model = jobConfig.EvalResult - - if job.Storage.IsLocalStorage { - model.URL = util.AddPrefixPath(lm.VolumeMountPrefix, model.URL) +// updateDeployModelFile updates deploy model file +func (lm *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error { + if job.JobConfig.Storage.IsLocalStorage { + trainedModel = util.AddPrefixPath(lm.VolumeMountPrefix, trainedModel) } - deployModelURL := jobConfig.DeployModel.URL - if err := job.Storage.CopyFile(model.URL, deployModelURL); err != nil { - return nil, fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v", - model.URL, deployModelURL, err) + if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil { + return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w", + trainedModel, deployModel, err) } - klog.V(4).Infof("copy model(url=%s) to the deploy model(url=%s) successfully", model.URL, deployModelURL) - klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, model.URL) + klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel) - return model, nil + return nil } // createOutputDir creates the job output dir -func (job *Job) createOutputDir(jobConfig *LLJobConfig) error { +func (job *Job) createOutputDir(jobConfig *JobConfig) error { outputDir := jobConfig.OutputDir dirNames := []string{"data/train", "data/eval", "train", "eval"} - // lifelong_kb_index.pkl - if job.Storage.IsLocalStorage { + if job.JobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(outputDir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir) + klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, outputDir, err) return err } for _, v := range dirNames { dir := path.Join(outputDir, v) if err := util.CreateFolder(dir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir) + klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, dir, err) return err } } } - outputConfig := LLOutputConfig{ + outputConfig := OutputConfig{ SamplesOutput: map[string]string{ "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"), "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"), @@ -497,8 +497,18 @@ func (job *Job) createOutputDir(jobConfig *LLJobConfig) error { return nil } +func (lm *Manager) getLatestCondition(job *Job) sednav1.LLJobCondition { + jobConditions := job.Status.Conditions + var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{} + if len(jobConditions) > 0 { + // get latest pod and pod status + latestCondition = jobConditions[len(jobConditions)-1] + } + return latestCondition +} + // createFile creates data file and data index file -func (job *Job) createFile(dir string, format string, isLocalStorage bool) (string, string) { +func createFile(dir string, format string, isLocalStorage bool) (string, string) { switch strings.ToLower(format) { case dataset.TXTFormat: if isLocalStorage { @@ -512,16 +522,17 @@ func (job *Job) createFile(dir string, format string, isLocalStorage bool) (stri return "", "" } -// writeLLJSamples writes samples information to a file -func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, error) { - version := job.JobConfig.Version - format := job.Dataset.Spec.Format - urlPrefix := job.Dataset.URLPrefix +// writeSamples writes samples information to a file +func (lm *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) { + if samples == nil { + return "", "", fmt.Errorf("not samples") + } - subDir := strings.Join([]string{dir, strconv.Itoa(version)}, "/") - fileURL, absURLFile := job.createFile(subDir, format, job.Dataset.Storage.IsLocalStorage) + jobConfig := job.JobConfig + subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/") + fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage) - if job.Storage.IsLocalStorage { + if jobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(subDir); err != nil { return "", "", err } @@ -529,7 +540,7 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - if !job.Dataset.Storage.IsLocalStorage && absURLFile != "" { + if !jobConfig.Dataset.Storage.IsLocalStorage && absURLFile != "" { tempSamples := util.ParsingDatasetIndex(samples, urlPrefix) if err := job.writeByLine(tempSamples, absURLFile, format); err != nil { return "", "", err @@ -544,13 +555,13 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - localFileURL, localAbsURLFile := job.createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage) + localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage) if err := job.writeByLine(samples, localFileURL, format); err != nil { return "", "", err } - if err := job.Storage.Upload(localFileURL, fileURL); err != nil { + if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil { return "", "", err } @@ -561,7 +572,7 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil { + if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil { return "", "", err } @@ -584,7 +595,7 @@ func (job *Job) writeByLine(samples []string, fileURL string, format string) err w := bufio.NewWriter(file) if format == "csv" { - _, _ = fmt.Fprintln(w, job.Dataset.DataSource.Header) + _, _ = fmt.Fprintln(w, job.JobConfig.Dataset.DataSource.Header) } for _, line := range samples { @@ -605,62 +616,61 @@ func (job *Job) writeByLine(samples []string, fileURL string, format string) err // handleData updates samples information func (lm *Manager) handleData(job *Job) { - tick := time.NewTicker(LLHandlerDataIntervalSeconds * time.Second) + tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second) jobConfig := job.JobConfig iterCount := 0 for { select { - case <-job.Done: + case <-jobConfig.Done: return default: } - // in case dataset is not synced to LC before job synced to LC - // here call loadDataset in each period - err := lm.loadDataset(job) if iterCount%100 == 0 { - klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier) + klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier) } iterCount++ - if err != nil { - klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v", - jobConfig.UniqueIdentifier, - err) + + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + // already loaded dataset <-tick.C continue } - dataset := job.Dataset + dataset := jobConfig.Dataset + currentNumberOfSamples := dataset.DataSource.NumberOfSamples + previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers - if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers { + if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples { samples := dataset.DataSource.TrainSamples - trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers)) + newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples + trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples)) jobConfig.Lock.Lock() jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples, - samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...) - klog.Infof("job(name=%s) current train samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples)) + samples[previousNumberOfSamples:previousNumberOfSamples+trainNum]...) + klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum) jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples, - samples[(jobConfig.DataSamples.Numbers+trainNum+1):]) + samples[previousNumberOfSamples+trainNum:]) jobConfig.Lock.Unlock() for _, v := range jobConfig.DataSamples.EvalVersionSamples { jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...) } - klog.Infof("job(name=%s) current eval samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples)) + klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples)) - jobConfig.DataSamples.Numbers = len(samples) + jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples } + <-tick.C } } +// loadDataset loads dataset information func (lm *Manager) loadDataset(job *Job) error { - if job.Dataset != nil { + if job.JobConfig.Dataset != nil { // already loaded return nil } @@ -671,30 +681,37 @@ func (lm *Manager) loadDataset(job *Job) error { return fmt.Errorf("not exists dataset(name=%s)", datasetName) } - jobConfig := job.JobConfig - jobConfig.DataSamples = &LLDataSamples{ - Numbers: 0, - TrainSamples: make([]string, 0), - EvalVersionSamples: make([][]string, 0), - EvalSamples: make([]string, 0), - } - - job.Dataset = dataset + job.JobConfig.Dataset = dataset return nil } // initJob inits the job object -func (lm *Manager) initJob(job *Job) error { +func (lm *Manager) initJob(job *Job, name string) error { + job.JobConfig = new(JobConfig) + jobConfig := job.JobConfig - jobConfig.TrainModel = new(Model) - jobConfig.EvalResult = new(Model) + jobConfig.UniqueIdentifier = name + + jobConfig.Storage = storage.Storage{IsLocalStorage: false} + credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] + if credential != "" { + if err := job.JobConfig.Storage.SetCredential(credential); err != nil { + return fmt.Errorf("failed to set storage credential: %w", err) + } + } + + jobConfig.Done = make(chan struct{}) jobConfig.Lock = sync.Mutex{} + jobConfig.Rounds = 0 + + jobConfig.DataSamples = &DataSamples{ + PreviousNumbers: 0, + TrainSamples: make([]string, 0), + EvalVersionSamples: make([][]string, 0), + EvalSamples: make([]string, 0), + } - jobConfig.Version = 0 - jobConfig.Phase = TrainPhase - jobConfig.WorkerStatus = workertypes.ReadyStatus - jobConfig.TriggerStatus = TriggerReadyStatus - trainTrigger, err := newLLTrigger(job.Spec.TrainSpec.Trigger) + trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger) if err != nil { return fmt.Errorf("failed to init train trigger: %+w", err) } @@ -702,13 +719,13 @@ func (lm *Manager) initJob(job *Job) error { outputDir := job.Spec.OutputDir - isLocalURL, err := job.Storage.IsLocalURL(outputDir) + isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir) if err != nil { - return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir) + return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", job.Name, outputDir, err) } if isLocalURL { - job.Storage.IsLocalStorage = true + jobConfig.Storage.IsLocalStorage = true outputDir = util.AddPrefixPath(lm.VolumeMountPrefix, outputDir) } @@ -723,10 +740,22 @@ func (lm *Manager) initJob(job *Job) error { URL: strings.Join([]string{strings.TrimRight(outputDir, "/"), "deploy/index.pkl"}, "/"), } + if err := lm.updateJobFromDB(job); err != nil { + klog.Errorf("job(%s) failed to update job from db: %v", name, err) + } + + initTriggerStatus(jobConfig) + return nil } -func newLLTrigger(t sednav1.LLTrigger) (trigger.Base, error) { +func initTriggerStatus(jobConfig *JobConfig) { + jobConfig.TrainTriggerStatus = TriggerReadyStatus + jobConfig.EvalTriggerStatus = TriggerReadyStatus + jobConfig.DeployTriggerStatus = TriggerReadyStatus +} + +func newTrigger(t sednav1.LLTrigger) (trigger.Base, error) { // convert trigger to map triggerMap := make(map[string]interface{}) c, err := json.Marshal(t) @@ -741,55 +770,83 @@ func newLLTrigger(t sednav1.LLTrigger) (trigger.Base, error) { return trigger.NewTrigger(triggerMap) } -// forwardSamplesLL deletes the samples information in the memory -func forwardSamplesLL(jobConfig *LLJobConfig) { - switch jobConfig.Phase { - case TrainPhase: - { - jobConfig.Lock.Lock() - jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0] - jobConfig.Lock.Unlock() - } - case EvalPhase: - { - if len(jobConfig.DataSamples.EvalVersionSamples) > LLEvalSamplesCapacity { - jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:] +// getModelsFromJobConditions gets models from job condition +func (lm *Manager) getModelsFromJobConditions(jobConditions []sednav1.LLJobCondition, stage sednav1.LLJobStage, currentType sednav1.LLJobStageConditionType, dataType string) []Model { + // TODO: runtime.type changes to common.type for gm and lc + for i := len(jobConditions) - 1; i >= 0; i-- { + var cond gmtypes.ConditionData + jobCond := jobConditions[i] + if jobCond.Stage == stage && jobCond.Type == currentType { + if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil { + continue + } + + if dataType == "input" { + if cond.Input == nil { + continue + } + + return cond.Input.Models + } else if dataType == "output" { + if cond.Output == nil { + continue + } + + return cond.Output.Models } } } -} -// backLLTaskStatus backs train task status -func backLLTaskStatus(jobConfig *LLJobConfig) { - jobConfig.Phase = TrainPhase - initLLTaskStatus(jobConfig) + return nil } -// initLLTaskStatus inits task status -func initLLTaskStatus(jobConfig *LLJobConfig) { - jobConfig.WorkerStatus = workertypes.ReadyStatus - jobConfig.TriggerStatus = TriggerReadyStatus -} +// getEvalResult gets eval result from job conditions +func (lm *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) { + jobConditions := job.Status.Conditions + models := lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output") -// nextLLTask converts next task status -func nextLLTask(jobConfig *LLJobConfig) { - switch jobConfig.Phase { - case TrainPhase: - { - forwardSamplesLL(jobConfig) - initLLTaskStatus(jobConfig) - jobConfig.Phase = EvalPhase + var result []map[string][]float64 + var err error + for _, m := range models { + bytes, err := json.Marshal(m.Metrics) + if err != nil { + return nil, err } - case EvalPhase: - { - forwardSamplesLL(jobConfig) - initLLTaskStatus(jobConfig) - jobConfig.Phase = DeployPhase + data := make(map[string][]float64) + if err = json.Unmarshal(bytes, &data); err != nil { + return nil, err } - case DeployPhase: - { - backLLTaskStatus(jobConfig) + + result = append(result, data) + } + return result, err +} + +// getJobStageModel gets model from job conditions for eval/deploy +func (lm *Manager) getJobStageModel(job *Job, jobStage sednav1.LLJobStage) (models []Model) { + jobConditions := job.Status.Conditions + + switch jobStage { + case sednav1.LLJobEval: + models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobTrain, sednav1.LLJobStageCondCompleted, "output") + case sednav1.LLJobDeploy: + models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output") + } + + return models +} + +// forwardSamples deletes the samples information in the memory +func forwardSamples(jobConfig *JobConfig, jobStage sednav1.LLJobStage) { + switch jobStage { + case sednav1.LLJobTrain: + jobConfig.Lock.Lock() + jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0] + jobConfig.Lock.Unlock() + case sednav1.LLJobEval: + if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity { + jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:] } } } @@ -798,8 +855,8 @@ func nextLLTask(jobConfig *LLJobConfig) { func (lm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) - if job, ok := lm.LifelongLearningJobMap[name]; ok && job.Done != nil { - close(job.Done) + if job, ok := lm.LifelongLearningJobMap[name]; ok && job.JobConfig.Done != nil { + close(job.JobConfig.Done) } delete(lm.LifelongLearningJobMap, name) @@ -811,7 +868,79 @@ func (lm *Manager) Delete(message *clienttypes.Message) error { return nil } -// Start starts LifelongLearningJob manager +// updateJobFromDB updates job from db +func (lm *Manager) updateJobFromDB(job *Job) error { + var err error + + previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier) + if err != nil { + return err + } + + m := metav1.ObjectMeta{} + if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) { + return err + } + + rounds, ok := m.Annotations[AnnotationsRoundsKey] + if !ok { + return nil + } + + if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil { + return err + } + + numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey] + if !ok { + return nil + } + + if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil { + return err + } + + dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey] + if !ok { + return nil + } + + localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "") + + if !job.JobConfig.Storage.IsLocalStorage { + defer os.RemoveAll(localURL) + } + + if err != nil { + return err + } + + samples, err := dataset.GetSamples(dataFileOfEval) + if err != nil { + klog.Errorf("read file %s failed: %v", dataFileOfEval, err) + return err + } + + job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples) + + return nil +} + +// saveJobToDB saves job info to db +func (lm *Manager) saveJobToDB(job *Job) error { + ann := job.ObjectMeta.Annotations + if ann == nil { + ann = make(map[string]string) + } + + ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds) + ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers) + ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL + + return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec) +} + +// Start starts lifelong-learning-job manager func (lm *Manager) Start() error { go lm.monitorWorker() @@ -845,45 +974,65 @@ func (lm *Manager) monitorWorker() { Status: workerMessage.Status, Output: &wo, } - lm.Client.WriteMessage(msg, job.getHeader()) - - lm.handleWorkerMessage(job, workerMessage) + if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { + klog.Errorf("job(%s) failed to write message: %v", name, err) + continue + } else { + klog.Infof("job(%s) write message(%v) to GM", name, msg) + } } } -// handleWorkerMessage handles message from worker -func (lm *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) { - jobPhase := job.JobConfig.Phase - workerKind := workerMessage.Kind - if jobPhase != workerKind { - klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier, - jobPhase, workerKind) - return +func (lm *Manager) monitorS3Worker(job *Job, stage sednav1.LLJobStage) { + jobConfig := job.JobConfig + var statusFile string + switch stage { + case sednav1.LLJobTrain: + statusFile = strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/") + case sednav1.LLJobEval: + statusFile = strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/") } - var models []*Model - for _, result := range workerMessage.Results { - model := Model{ - Format: result["format"].(string), - URL: result["url"].(string)} - models = append(models, &model) - } + tempLocalFile := filepath.Join(os.TempDir(), "status.json") + for { + time.Sleep(WorkerS3StatusHandlerIntervalSeconds * time.Second) + localFile, err := jobConfig.Storage.Download(statusFile, tempLocalFile) + if err != nil { + continue + } - model := &Model{} - if len(models) != 1 { - return - } - model = models[0] + bytes, _ := ioutil.ReadFile(localFile) + workerMessage := workertypes.MessageContent{} + err = json.Unmarshal(bytes, &workerMessage) + if err != nil { + continue + } - job.JobConfig.WorkerStatus = workerMessage.Status + wo := clienttypes.Output{} + wo.Models = workerMessage.Results + wo.OwnerInfo = workerMessage.OwnerInfo - if job.JobConfig.WorkerStatus == workertypes.CompletedStatus { - switch job.JobConfig.Phase { - case TrainPhase: - job.JobConfig.TrainModel = model - case EvalPhase: - job.JobConfig.EvalResult = model + msg := &clienttypes.UpstreamMessage{ + Phase: workerMessage.Kind, + Status: workerMessage.Status, + Output: &wo, } + + name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind) + if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { + klog.Errorf("job(%s) failed to write message: %v", name, err) + continue + } + + if err = jobConfig.Storage.DeleteFile(statusFile); err != nil { + continue + } + + if err = jobConfig.Storage.DeleteFile(tempLocalFile); err != nil { + continue + } + + break } } diff --git a/pkg/localcontroller/storage/minio.go b/pkg/localcontroller/storage/minio.go index a4453ff48..0b5d672b2 100644 --- a/pkg/localcontroller/storage/minio.go +++ b/pkg/localcontroller/storage/minio.go @@ -145,3 +145,20 @@ func (mc *MinioClient) parseURL(URL string) (string, string, error) { return "", "", fmt.Errorf("invalid url(%s)", URL) } + +// deleteFile deletes file +func (mc *MinioClient) deleteFile(objectURL string) error { + bucket, absPath, err := mc.parseURL(objectURL) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut) + defer cancel() + + if err = mc.Client.RemoveObject(ctx, bucket, absPath, minio.RemoveObjectOptions{}); err != nil { + return fmt.Errorf("delete file(url=%s) failed, error: %+v", objectURL, err) + } + + return nil +} diff --git a/pkg/localcontroller/storage/storage.go b/pkg/localcontroller/storage/storage.go index 8855fc95f..25bb45e9a 100644 --- a/pkg/localcontroller/storage/storage.go +++ b/pkg/localcontroller/storage/storage.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "net/url" + "os" "path" "path/filepath" @@ -232,3 +233,19 @@ func (s *Storage) CopyFile(srcURL string, objectURL string) error { return nil } + +// DeleteFile deletes file +func (s *Storage) DeleteFile(objectURL string) error { + prefix, err := s.CheckURL(objectURL) + if err != nil { + return err + } + switch prefix { + case S3Prefix: + return s.MinioClient.deleteFile(objectURL) + case LocalPrefix: + return os.Remove(objectURL) + default: + return fmt.Errorf("invalid url(%s)", objectURL) + } +} diff --git a/scripts/installation/all-in-one.sh b/scripts/installation/all-in-one.sh index 8e352e853..281bc359d 100755 --- a/scripts/installation/all-in-one.sh +++ b/scripts/installation/all-in-one.sh @@ -18,7 +18,7 @@ # - A Kubernetes v1.21 cluster with multi worker nodes, default none worker node. # - KubeEdge with multi nodes, default is latest KubeEdge and one edge node. # - Sedna, default is latest release version. -# +# # It requires you: # - 2 CPUs or more # - 2GB+ free memory, depends on node number setting @@ -76,12 +76,12 @@ function prepare_env() { readonly MAX_EDGE_WORKER_NODES=3 # TODO: find a better way to figure this kind control plane - readonly CONTROL_PLANE_NAME=${CLUSTER_NAME}-control-plane + readonly CONTROL_PLANE_NAME=${CLUSTER_NAME}-control-plane readonly CLOUD_WORKER_NODE_NAME=${CLUSTER_NAME}-worker - # cloudcore default websocket port + # cloudcore default websocket port : ${CLOUDCORE_WS_PORT:=10000} - # cloudcore default cert port + # cloudcore default cert port : ${CLOUDCORE_CERT_PORT:=10002} # for debug purpose @@ -168,7 +168,7 @@ function patch_kindnet() { # which would require KUBERNETES_SERVICE_HOST/KUBERNETES_SERVICE_PORT environment variables. # But edgecore(up to 1.8.0) does not inject these environments. # Here make a patch: can be any value - run_in_control_plane kubectl set env -n kube-system daemonset/kindnet KUBERNETES_SERVICE_HOST=10.96.0.1 KUBERNETES_SERVICE_PORT=443 + run_in_control_plane kubectl set env -n kube-system daemonset/kindnet KUBERNETES_SERVICE_HOST=10.96.0.1 KUBERNETES_SERVICE_PORT=443 } function create_k8s_cluster() { @@ -224,10 +224,15 @@ function setup_cloudcore() { CLOUDCORE_EXPOSED_ADDR=$CLOUDCORE_EXPOSED_IP:$CLOUDCORE_EXPOSED_WS_PORT # keadm accepts version format: 1.8.0 - local version=${KUBEEDGE_VERSION/v} + local version=${KUBEEDGE_VERSION/v} run_in_control_plane bash -euc " # install cloudcore - pgrep cloudcore >/dev/null || keadm init --kubeedge-version=$version --advertise-address=$CLOUDCORE_ADVERTISE_ADDRESSES"' + pgrep cloudcore >/dev/null || { + # keadm 1.8.1 is incompatible with 1.9.1 since crds' upgrade + rm -rf /etc/kubeedge/crds + + keadm init --kubeedge-version=$version --advertise-address=$CLOUDCORE_ADVERTISE_ADDRESSES"' + } # wait token to be created exit_code=1 @@ -248,10 +253,168 @@ function setup_cloudcore() { KUBEEDGE_TOKEN=$(run_in_control_plane keadm gettoken) } +_change_detect_yaml_change() { + # execute the specified yq commands on stdin + # if same, output nothing + # else output the updated yaml + local yq_cmds="${1:-.}" + docker run -i --rm --entrypoint sh mikefarah/yq -c " + yq e . - > a + yq e '$yq_cmds' a > b + cmp -s a b || cat b + " +} + +reconfigure_edgecore() { + # update edgecore.yaml for every edge node + local script_name=reconfigure-edgecore + + if ((NUM_EDGE_NODES<1)); then + return + fi + + local yq_cmds="$1" + + # I want to leverage kubectl but k8s has no ways to run job on each node once + # see https://github.com/kubernetes/kubernetes/issues/64623 for more detais + # So I use Daemonset temporarily + + kubectl apply -f - < a && yq e '$(echo $yq_cmds)' a > b || exit 1 + cmp -s a b && echo No need to reconfigure \$NODE_NAME edgecore || { + # backup and overwrite config, kill edgecore and wait systemd restart it + cp /config/edgecore.yaml /config/edgecore.yaml.reconfigure_bk + cp b /config/edgecore.yaml + + pkill edgecore + + # check to edgecore process status + > pids + for i in 0 1 2 3; do + sleep 10 + { pidof edgecore || echo =\$i; } >> pids + done + [ \$(sort -u pids | wc -l) -le 2 ] && echo Reconfigure \$NODE_NAME edgecore successfully || { + echo Failed to reconfigure \$NODE_NAME edgecore >&2 + echo And recovery edgecore config yaml >&2 + cp a /config/edgecore.yaml + + # prevent daemonset execute this script too frequently + sleep 1800 + exit 1 + } + } + sleep inf +EOF + + # wait this script been executed + kubectl -n kubeedge rollout status --timeout=5m ds $script_name + # wait all edge nodes to be ready if restarted + kubectl wait --for=condition=ready node -l node-role.kubernetes.io/edge= -function setup_edgemesh() { - # TODO: wait for edgemesh one line installer - : + # keep this daemonset script for debugging + # kubectl -n kubeedge delete ds $script_name + +} + +reconfigure_cloudcore() { + + local config_file=/etc/kubeedge/config/cloudcore.yaml + local yq_cmds=$1 + + run_in_control_plane cat $config_file | + _change_detect_yaml_change "$yq_cmds" | + run_in_control_plane bash -euc " + cat > cc.yaml + ! grep -q . cc.yaml || { + echo reconfigure and restart cloudcore + cp $config_file ${config_file}.reconfigure_bk + cp cc.yaml $config_file + pkill cloudcore || true + # TODO: use a systemd service + (cloudcore &>> /var/log/kubeedge/cloudcore.log &) + } + + " + echo Reconfigure cloudcore successfully +} + +function install_edgemesh() { + if ((NUM_EDGE_NODES<1)); then + # no edge node, no edgemesh + return + fi + + local server_node_name + if ((NUM_CLOUD_WORKER_NODES>0)); then + server_node_name=${CLUSTER_NAME}-worker + else + server_node_name=${CLUSTER_NAME}-control-plane + fi + + echo Installing edgemesh with server on $server_node_name + # enable Local APIServer + reconfigure_cloudcore '.modules.dynamicController.enable=true' + + reconfigure_edgecore ' + .modules.edged.clusterDNS="169.254.96.16" + | .modules.edged.clusterDomain="cluster.local" + | .modules.metaManager.metaServer.enable=true + ' + + # no server.publicIP + # since allinone is in flat network, we just use private ip for edgemesh server + helm upgrade --install edgemesh \ + --set server.nodeName=$server_node_name \ + https://raw.githubusercontent.com/kubeedge/edgemesh/main/build/helm/edgemesh.tgz + + echo Install edgemesh successfully } function gen_cni_config() { @@ -365,12 +528,24 @@ function create_and_setup_edgenodes() { " # fix cni config file gen_cni_config | docker exec -i $containername tee /etc/cni/net.d/10-edgecni.conflist >/dev/null - + + { + # wait edge node to be created at background + nwait=20 + for((i=0;i/dev/null && break + sleep 3 + done + } & + done + # wait all edge nodes to be created + wait + } function clean_edgenodes() { - for cid in $(docker ps -a --filter label=sedna.io=sedna-mini-edge -q); do + for cid in $(docker ps -a --filter label=sedna.io=sedna-mini-edge -q); do docker stop $cid; docker rm $cid done } @@ -387,8 +562,6 @@ function setup_cloud() { setup_control_kubeconfig setup_cloudcore - - setup_edgemesh } function clean_cloud() { @@ -434,7 +607,7 @@ function get_latest_version() { # ... # "tag_name": "v1.0.0", # ... - + # Sometimes this will reach rate limit # https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting local url=https://api.github.com/repos/$repo/releases/latest @@ -458,7 +631,7 @@ function arch() { function _download_tool() { local name=$1 url=$2 - local file=/usr/local/bin/$name + local file=/usr/local/bin/$name curl -Lo $file $url chmod +x $file } @@ -488,9 +661,17 @@ function ensure_kubectl() { ensure_tool kubectl https://dl.k8s.io/release/v${version/v}/bin/linux/$(arch)/kubectl } +function ensure_helm() { + if check_command_exists helm; then + return + fi + curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash +} + function ensure_tools() { ensure_kind ensure_kubectl + ensure_helm } function main() { @@ -502,6 +683,11 @@ function main() { create) setup_cloud setup_edge + # wait all nodes to be ready + kubectl wait --for=condition=ready node --all + + # edgemesh need to be installed before sedna + install_edgemesh install_sedna log_info "Mini Sedna is created successfully" ;;