From cc08c26299c8a314b7eb9bcea4d33ded4b0ef4d9 Mon Sep 17 00:00:00 2001 From: Jie Pu Date: Wed, 10 Nov 2021 21:33:27 +0800 Subject: [PATCH 1/7] fix docs: index and quickstart Signed-off-by: Jie Pu --- docs/index.rst | 24 +- docs/quickstart.md | 384 +++--------------- docs/related_link.md | 15 +- .../helmet_detection/helmet_detection.yaml | 105 +++++ 4 files changed, 181 insertions(+), 347 deletions(-) create mode 100644 examples/incremental_learning/helmet_detection/helmet_detection.yaml diff --git a/docs/index.rst b/docs/index.rst index b9739ff71..c1a3c56b5 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,6 +9,16 @@ Sedna is an edge-cloud synergy AI project incubated in KubeEdge SIG AI. Benefiti Sedna can simply enable edge-cloud synergy capabilities to existing training and inference scripts, bringing the benefits of reducing costs, improving model performance, and protecting data privacy. +.. toctree:: + :maxdepth: 1 + :titlesonly: + :glob: + :caption: DEPLOY + + Cluster Installation used for production. + AllinOne Installation used for development. + Standalone Installation used for hello world. + .. toctree:: :maxdepth: 1 :caption: QUICK START @@ -31,23 +41,15 @@ Sedna can simply enable edge-cloud synergy capabilities to existing training and proposals/object-tracking -.. toctree:: - :maxdepth: 1 - :titlesonly: - :glob: - :caption: DEPLOY - - Installtion - Standalone - .. toctree:: :maxdepth: 1 :glob: :caption: EXAMPLES - examples/federated_learning/surface_defect_detection/README - examples/incremental_learning/helmet_detection/README examples/joint_inference/helmet_detection_inference/README + examples/incremental_learning/helmet_detection/README + examples/federated_learning/surface_defect_detection/README + examples/federated_learning/yolov5_coco128_mistnet/README examples/lifelong_learning/atcii/README examples/storage/s3/* diff --git a/docs/quickstart.md b/docs/quickstart.md index e2f27483d..8f1d881e0 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -1,138 +1,75 @@ # Quick Start -## Introduction +## Guide +- If you are new to Sedna, you can try the command step by step in this page. +- If you have played the following example, you can find more [examples](/examples/README.md). +- If you want to know more about sedna's architecture and component, you can find them in [sedna home]. +- If you're looking to contribute documentation improvements, you'll specifically want to see the [kubernetes documentation style guide] before [filing an issue][file-an-issue]. +- If you're planning to contribute code changes, you'll want to read the [development preparation guide] next. +- If you're planning to add a new synergy feature directly, you'll want to read the [guide][add-feature-guide] next. -Sedna provide some examples of running Sedna jobs in [here](/examples/README.md) +When done, you can also refer our [recommended Git workflow] and [pull request best practices] before submitting a pull request. -Here is a general guide to quick start an incremental learning job. -### Get Sedna +[proposals]: /docs/proposals +[development preparation guide]: ./prepare-environment.md +[add-feature-guide]: control-plane/add-a-new-synergy-feature.md -You can find the latest Sedna release [here](https://github.com/kubeedge/sedna/releases). +[sedna home]: https://github.com/kubeedge/sedna +[issues]: https://github.com/kubeedge/sedna/issues +[file-an-issue]: https://github.com/kubeedge/sedna/issues/new/choose +[file-a-fr]: https://github.com/kubeedge/sedna/issues/new?labels=kind%2Ffeature&template=enhancement.md -### Deploying Sedna +[github]: https://github.com/ +[kubernetes documentation style guide]: https://github.com/kubernetes/community/blob/master/contributors/guide/style-guide.md +[recommended Git workflow]: https://github.com/kubernetes/community/blob/master/contributors/guide/github-workflow.md#workflow +[pull request best practices]: https://github.com/kubernetes/community/blob/master/contributors/guide/pull-requests.md#best-practices-for-faster-reviews +[Kubernetes help wanted]: https://www.kubernetes.dev/docs/guide/help-wanted/ -Sedna provides two deployment methods, which can be selected according to your actual situation: -- Install Sedna on a cluster Step By Step: [guide here](setup/install.md). -- Install Sedna AllinOne : [guide here](setup/local-up.md). +The following is showing how to run an incremental learning job by sedna. +## Quick Start -### Component -Sedna consists of the following components: +#### 0. Check the Environment -![Architecture](./proposals/images/framework.png) - -#### GlobalManager -* Unified edge-cloud synergy AI task management -* Cross edge-cloud synergy management and collaboration -* Central Configuration Management - -#### LocalController -* Local process control of edge-cloud synergy AI tasks -* Local general management: model, dataset, and status synchronization - - -#### Worker -* Do inference or training, based on existing ML framework. -* Launch on demand, imagine they are docker containers. -* Different workers for different features. -* Could run on edge or cloud. - - -#### Lib -* Expose the Edge AI features to applications, i.e. training or inference programs. - - -### System Design - -There are three stages in a [incremental learning job](./proposals/incremental-learning.md): train/eval/deploy. - -![](./proposals/images/incremental-learning-state-machine.png) - -## Deployment Guide - -### 1. Prepare - -#### 1.1 Deployment Planning - -In this example, there is only one host with two nodes, which had creating a Kubernetes cluster with `kind`. - -| NAME | ROLES | Ip Address | Operating System | Host Configuration | Storage | Deployment Module | -| ----- | ------- | ----------------------------- | ----------------------- | ------------------ | ------- | ------------------------------------------------------------ | -| edge-node | agent,edge | 192.168.0.233 | Ubuntu 18.04.5 LTS | 8C16G | 500G | LC,lib, inference worker | -| sedna-control-plane | control-plane,master | 172.18.0.2 | Ubuntu 20.10 | 8C16G | 500G | GM,LC,lib,training worker,evaluate worker | - -#### 1.2 Network Requirements - -In this example the node **sedna-control-plane** has a internal-ip `172.18.0.2`, and **edge-node** can access it. - -### 2. Project Deployment - -#### 2.1 (optional) create virtual env +For Sedna all-in-one installation, it requires you: + - 1 VM **(one machine is OK, cluster is not required)** + - 2 CPUs or more + - 2GB+ free memory, depends on node number setting + - 10GB+ free disk space + - Internet connection(docker hub, github etc.) + - Linux platform, such as ubuntu/centos + - Docker 17.06+ +you can check the docker version by the following command, ```bash -python3.6 -m venv venv -source venv/bin/activate -pip3 install -U pip +docker -v +``` +after doing that, the output will be like this, that means your version fits the bill. +``` +Docker version 19.03.6, build 369ce74a3c ``` -#### 2.2 install sedna SDK - -```bash -cd $SENDA_ROOT/lib -python3.6 setup.py bdist_wheel -pip3 install dist/sedna*.whl -``` - -#### 2.3 Prepare your machine learning model and datasets - -##### 2.3.1 Encapsulate an Estimators - -Sedna implements several pre-made Estimators in [example](/examples), your can find them from the python scripts called `interface`. -Sedna supports the Estimators build from popular AI frameworks, such as TensorFlow, Pytorch, PaddlePaddle, MindSpore. Also Custom estimators can be used according to our interface document. -All Estimators—pre-made or custom ones—are classes should encapsulate the following actions: -- Training -- Evaluation -- Prediction -- Export/load - -Follow [here](/lib/sedna/README.md) for more details, a [toy_example](/examples/incremental_learning/helmet_detection/training/interface.py) like: - - -```python - -os.environ['BACKEND_TYPE'] = 'TENSORFLOW' - -class Estimator: - - def __init__(self, **kwargs): - ... - - def train(self, train_data, valid_data=None, **kwargs): - ... - - def evaluate(self, data, **kwargs): - ... - def predict(self, data, **kwargs): - ... - def load(self, model_url, **kwargs): - ... +#### 1. Deploy Sedna +Sedna provides three deployment methods, which can be selected according to your actual situation: - def save(self, model_path, **kwargs): - ... +- [Install Sedna AllinOne](setup/all-in-one.md). (used for development, here we use it) +- [Install Sedna local up](setup/local-up.md). +- [Install Sedna on a cluster](setup/install.md). - def get_weights(self): - ... +The [all-in-one script](/scripts/installation/all-in-one.sh) is used to install Sedna along with a mini Kubernetes environment locally, including: + - A Kubernetes v1.21 cluster with multi worker nodes, default zero worker node. + - KubeEdge with multi edge nodes, default is latest KubeEdge and one edge node. + - Sedna, default is the latest version. - def set_weights(self, weights): - ... -``` - -##### 2.3.2 Dataset prepare + ```bash + curl https://raw.githubusercontent.com/kubeedge/sedna/master/scripts/installation/all-in-one.sh | NUM_EDGE_NODES=2 bash - + ``` +#### 2. Download model and datasets In incremental_learning jobs, the following files will be indispensable: - base model: tensorflow object detection Fine-tuning a model from an existing checkpoint. @@ -161,101 +98,8 @@ tar -zxvf video.tar.gz ``` -#### 2.3.3 Scripts prepare - -In incremental_learning jobs, the following scripts will be indispensable: - -- train.py: script for model fine-tuning/training. -- eval.py: script for model evaluate. -- inference.py: script for data inference. - -You can also find demos [here](/examples/incremental_learning/helmet_detection). - -Some interfaces should be learn in job pipeline: - -- `BaseConfig` provides the capability of obtaining the config from env - -```python +#### 3. Create model and dataset object -from sedna.common.config import BaseConfig - -train_dataset_url = BaseConfig.train_dataset_url -model_url = BaseConfig.model_url - -``` - -- `Context` provides the capability of obtaining the context from CRD - -```python -from sedna.common.config import Context - -obj_threshold = Context.get_parameters("obj_threshold") -nms_threshold = Context.get_parameters("nms_threshold") -input_shape = Context.get_parameters("input_shape") -epochs = Context.get_parameters('epochs') -batch_size = Context.get_parameters('batch_size') - -``` - -- `datasources` base class, as that core feature of sedna require identifying the features and labels from data input, we specify that the first parameter for train/evaluate of the ML framework - -```python -from sedna.datasources import BaseDataSource - - -train_data = BaseDataSource(data_type="train") -train_data.x = [] -train_data.y = [] -for item in mnist_ds.create_dict_iterator(): - train_data.x.append(item["image"].asnumpy()) - train_data.y.append(item["label"].asnumpy()) -``` - -- `sedna.core` contain all edge-cloud features, Please note that each feature has its own parameters. -- **Hard Example Mining Algorithms** in IncrementalLearning named `hard_example_mining` - -```python -from sedna.core.incremental_learning import IncrementalLearning - -hard_example_mining = IncrementalLearning.get_hem_algorithm_from_config( - threshold_img=0.9 -) - -# initial an incremental instance -incremental_instance = IncrementalLearning( - estimator=Estimator, - hard_example_mining=hem_dict -) - -# Call the interface according to the job state - -# train.py -incremental_instance.train(train_data=train_data, epochs=epochs, - batch_size=batch_size, - class_names=class_names, - input_shape=input_shape, - obj_threshold=obj_threshold, - nms_threshold=nms_threshold) - -# inference -results, _, is_hard_example = incremental_instance.inference( - data, input_shape=input_shape) - - -``` - - -### 3. Configuration - -##### 3.1 Prepare Image -This example uses the image: -``` -kubeedge/sedna-example-incremental-learning-helmet-detection:v0.4.0 -``` - -This image is generated by the script [build_images.sh](/examples/build_image.sh), used for creating training, eval and inference worker. - -##### 3.2 Create Incremental Job In this example, `$WORKER_NODE` is a custom node, you can fill it which you actually run. ``` @@ -305,140 +149,20 @@ spec: EOF ``` - -### 4. Run - -* incremental learning supports hot model updates and cold model updates. Job support -cold model updates default. If you want to use hot model updates, please to add the following fields: - -```yaml -deploySpec: - model: - hotUpdateEnabled: true - pollPeriodSeconds: 60 # default value is 60 -``` +#### 4. Start an incremental learning job * create the job: ``` -IMAGE=kubeedge/sedna-example-incremental-learning-helmet-detection:v0.4.0 - -kubectl create -f - <" - threshold: 500 - metric: num_of_samples - evalSpec: - template: - spec: - nodeName: $WORKER_NODE - containers: - - image: $IMAGE - name: eval-worker - imagePullPolicy: IfNotPresent - args: ["eval.py"] - env: - - name: "input_shape" - value: "352,640" - - name: "class_names" - value: "person,helmet,helmet-on,helmet-off" - deploySpec: - model: - name: "deploy-model" - hotUpdateEnabled: true - pollPeriodSeconds: 60 - trigger: - condition: - operator: ">" - threshold: 0.1 - metric: precision_delta - hardExampleMining: - name: "IBT" - parameters: - - key: "threshold_img" - value: "0.9" - - key: "threshold_box" - value: "0.9" - template: - spec: - nodeName: $WORKER_NODE - containers: - - image: $IMAGE - name: infer-worker - imagePullPolicy: IfNotPresent - args: ["inference.py"] - env: - - name: "input_shape" - value: "352,640" - - name: "video_url" - value: "file://video/video.mp4" - - name: "HE_SAVED_URL" - value: "/he_saved_url" - volumeMounts: - - name: localvideo - mountPath: /video/ - - name: hedir - mountPath: /he_saved_url - resources: # user defined resources - limits: - memory: 2Gi - volumes: # user defined volumes - - name: localvideo - hostPath: - path: /incremental_learning/video/ - type: DirectoryorCreate - - name: hedir - hostPath: - path: /incremental_learning/he/ - type: DirectoryorCreate - outputDir: "/output" -EOF +kubectl -f https://raw.githubusercontent.com/kubeedge/sedna/main/examples/incremental_learning/helmet_detection/helmet_detection.yaml ``` 1. The `Dataset` describes data with labels and `HE_SAVED_URL` indicates the address of the deploy container for uploading hard examples. Users will mark label for the hard examples in the address. 2. Ensure that the path of outputDir in the YAML file exists on your node. This path will be directly mounted to the container. -### 5. Monitor +#### 5. Check the result -### Check Incremental Learning Job Query the service status: ``` diff --git a/docs/related_link.md b/docs/related_link.md index 5e8e07565..f6fdf542a 100644 --- a/docs/related_link.md +++ b/docs/related_link.md @@ -1,9 +1,12 @@ -[支持边云协同终身学习特性,KubeEdge 子项目 Sedna 0.3.0 版本发布!](https://juejin.cn/post/6970866022286884878) -[【HDC.Cloud 2021】边云协同,打通AI最后一公里](https://xie.infoq.cn/article/b22e72afe8de50ca34269bb21) +### Release +[Sedna0.4.0发布,支持表征提取联邦学习,减少边侧资源需求](https://mp.weixin.qq.com/s/_m5q0t0yYY7gnfQUAssjFg) +[支持边云协同终身学习特性,KubeEdge子项目Sedna 0.3.0版本发布!](https://mp.weixin.qq.com/s/kSFL_pf2BTyVvH5c9zv0Jg) +[体验边云协同AI框架!KubeEdge子项目Sedna 0.1版本发布](https://mp.weixin.qq.com/s/3Ei8ynSAxnfuoIWYdb7Gpg) +[加速AI边云协同创新!KubeEdge社区建立Sedna子项目](https://mp.weixin.qq.com/s/FX2DOsctS_Z7CKHndFByRw) +[边缘智能还能怎么玩?KubeEdge AI SIG 带你飞](https://mp.weixin.qq.com/s/t10_ZrZW42AZoYnisVAbpg) -[KubeEdge Sedna如何实现边缘AI模型精度提升50%](https://www.huaweicloud.com/zhishi/hdc2021-Track-24-18.html) -[KubeEdge子项目Sedna 0.1版本发布!支持边云协同增量学习、联邦学习、协同推理](https://mp.weixin.qq.com/s/3Ei8ynSAxnfuoIWYdb7Gpg) - -[加速AI边云协同创新!KubeEdge社区建立Sedna子项目](https://cloud.tencent.com/developer/article/1791739) +### Meetup and Conference +[【HDC.Cloud 2021】边云协同,打通AI最后一公里](https://xie.infoq.cn/article/b22e72afe8de50ca34269bb21) +[KubeEdge Sedna如何实现边缘AI模型精度提升50%](https://www.huaweicloud.com/zhishi/hdc2021-Track-24-18.html) diff --git a/examples/incremental_learning/helmet_detection/helmet_detection.yaml b/examples/incremental_learning/helmet_detection/helmet_detection.yaml new file mode 100644 index 000000000..2561be3a8 --- /dev/null +++ b/examples/incremental_learning/helmet_detection/helmet_detection.yaml @@ -0,0 +1,105 @@ +apiVersion: sedna.io/v1alpha1 +kind: IncrementalLearningJob +metadata: + name: helmet-detection-demo +spec: + initialModel: + name: "initial-model" + dataset: + name: "incremental-dataset" + trainProb: 0.8 + trainSpec: + template: + spec: + nodeName: $WORKER_NODE + containers: + - image: $IMAGE + name: train-worker + imagePullPolicy: IfNotPresent + args: [ "train.py" ] + env: + - name: "batch_size" + value: "32" + - name: "epochs" + value: "1" + - name: "input_shape" + value: "352,640" + - name: "class_names" + value: "person,helmet,helmet-on,helmet-off" + - name: "nms_threshold" + value: "0.4" + - name: "obj_threshold" + value: "0.3" + trigger: + checkPeriodSeconds: 60 + timer: + start: 02:00 + end: 20:00 + condition: + operator: ">" + threshold: 500 + metric: num_of_samples + evalSpec: + template: + spec: + nodeName: $WORKER_NODE + containers: + - image: $IMAGE + name: eval-worker + imagePullPolicy: IfNotPresent + args: [ "eval.py" ] + env: + - name: "input_shape" + value: "352,640" + - name: "class_names" + value: "person,helmet,helmet-on,helmet-off" + deploySpec: + model: + name: "deploy-model" + hotUpdateEnabled: true + pollPeriodSeconds: 60 + trigger: + condition: + operator: ">" + threshold: 0.1 + metric: precision_delta + hardExampleMining: + name: "IBT" + parameters: + - key: "threshold_img" + value: "0.9" + - key: "threshold_box" + value: "0.9" + template: + spec: + nodeName: $WORKER_NODE + containers: + - image: $IMAGE + name: infer-worker + imagePullPolicy: IfNotPresent + args: [ "inference.py" ] + env: + - name: "input_shape" + value: "352,640" + - name: "video_url" + value: "file://video/video.mp4" + - name: "HE_SAVED_URL" + value: "/he_saved_url" + volumeMounts: + - name: localvideo + mountPath: /video/ + - name: hedir + mountPath: /he_saved_url + resources: # user defined resources + limits: + memory: 2Gi + volumes: # user defined volumes + - name: localvideo + hostPath: + path: /incremental_learning/video/ + type: DirectoryorCreate + - name: hedir + hostPath: + path: /incremental_learning/he/ + type: DirectoryorCreate + outputDir: "/output" \ No newline at end of file From 86839970180bd454ebf08c3faf9135a955105024 Mon Sep 17 00:00:00 2001 From: Jie Pu Date: Thu, 11 Nov 2021 11:22:16 +0800 Subject: [PATCH 2/7] docs update: index and quickstart Signed-off-by: Jie Pu --- docs/api/lib/index.rst | 2 +- docs/conf.py | 2 +- docs/index.rst | 36 ++- docs/index/guide.md | 24 ++ docs/{setup => index}/quick-start.md | 0 docs/index/quickstart.md | 183 ++++++++++++++++ docs/{ => index}/related_link.md | 0 docs/{ => index}/roadmap.md | 16 +- docs/quickstart.md | 205 ------------------ docs/setup/install.md | 2 +- .../helmet_detection_inference.yaml | 66 ++++++ 11 files changed, 297 insertions(+), 239 deletions(-) create mode 100644 docs/index/guide.md rename docs/{setup => index}/quick-start.md (100%) create mode 100644 docs/index/quickstart.md rename docs/{ => index}/related_link.md (100%) rename docs/{ => index}/roadmap.md (55%) delete mode 100644 docs/quickstart.md create mode 100644 examples/joint_inference/helmet_detection_inference/helmet_detection_inference.yaml diff --git a/docs/api/lib/index.rst b/docs/api/lib/index.rst index 15569e51d..d0628f67c 100644 --- a/docs/api/lib/index.rst +++ b/docs/api/lib/index.rst @@ -1,5 +1,5 @@ =========================================== -Sedna Python SDK +Python API Use Guide =========================================== .. mdinclude:: ../../../lib/sedna/README.md \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index a932286f8..8808b56bb 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -58,7 +58,7 @@ # -- Project information ----------------------------------------------------- project = 'Sedna' -copyright = '2020, Kubeedge' +copyright = '2021, Kubeedge' author = 'Kubeedge' version = __version__ diff --git a/docs/index.rst b/docs/index.rst index c1a3c56b5..6def8ac57 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,21 +9,24 @@ Sedna is an edge-cloud synergy AI project incubated in KubeEdge SIG AI. Benefiti Sedna can simply enable edge-cloud synergy capabilities to existing training and inference scripts, bringing the benefits of reducing costs, improving model performance, and protecting data privacy. + .. toctree:: :maxdepth: 1 - :titlesonly: - :glob: - :caption: DEPLOY + :caption: GUIDE + + index/guide + index/quickstart - Cluster Installation used for production. - AllinOne Installation used for development. - Standalone Installation used for hello world. .. toctree:: :maxdepth: 1 - :caption: QUICK START + :titlesonly: + :glob: + :caption: DEPLOY - quickstart + Cluster Installation (for production) + AllinOne Installation (for development) + Standalone Installation (for hello world) .. toctree:: @@ -56,11 +59,13 @@ Sedna can simply enable edge-cloud synergy capabilities to existing training and .. toctree:: :maxdepth: 1 - :caption: API + :caption: API REFERENCE :titlesonly: :glob: api/lib/* + Python API + .. toctree:: :maxdepth: 1 @@ -71,26 +76,17 @@ Sedna can simply enable edge-cloud synergy capabilities to existing training and Control Plane -.. toctree:: - :maxdepth: 1 - :caption: API REFERENCE - :titlesonly: - :glob: - - autoapi/lib/sedna/index - - .. toctree:: :caption: ROADMAP :hidden: - roadmap + index/roadmap RELATED LINKS ============= -.. mdinclude:: related_link.md +.. mdinclude:: index/related_link.md Indices and tables ================== diff --git a/docs/index/guide.md b/docs/index/guide.md new file mode 100644 index 000000000..155fc1f48 --- /dev/null +++ b/docs/index/guide.md @@ -0,0 +1,24 @@ +## Guide +- If you are new to Sedna, you can try the command step by step in [quick start](./quickstart.md). +- If you have played the above example, you can find more [examples](/examples/README.md). +- If you want to know more about sedna's architecture and component, you can find them in [sedna home]. +- If you're looking to contribute documentation improvements, you'll specifically want to see the [kubernetes documentation style guide] before [filing an issue][file-an-issue]. +- If you're planning to contribute code changes, you'll want to read the [development preparation guide] next. +- If you're planning to add a new synergy feature directly, you'll want to read the [guide][add-feature-guide] next. + + +[proposals]: /docs/proposals +[development preparation guide]: ../contributing/prepare-environment.md +[add-feature-guide]: ../contributing/control-plane/add-a-new-synergy-feature.md + +[sedna home]: https://github.com/kubeedge/sedna +[issues]: https://github.com/kubeedge/sedna/issues +[file-an-issue]: https://github.com/kubeedge/sedna/issues/new/choose +[file-a-fr]: https://github.com/kubeedge/sedna/issues/new?labels=kind%2Ffeature&template=enhancement.md + +[github]: https://github.com/ +[kubernetes documentation style guide]: https://github.com/kubernetes/community/blob/master/contributors/guide/style-guide.md +[recommended Git workflow]: https://github.com/kubernetes/community/blob/master/contributors/guide/github-workflow.md#workflow +[pull request best practices]: https://github.com/kubernetes/community/blob/master/contributors/guide/pull-requests.md#best-practices-for-faster-reviews +[Kubernetes help wanted]: https://www.kubernetes.dev/docs/guide/help-wanted/ + diff --git a/docs/setup/quick-start.md b/docs/index/quick-start.md similarity index 100% rename from docs/setup/quick-start.md rename to docs/index/quick-start.md diff --git a/docs/index/quickstart.md b/docs/index/quickstart.md new file mode 100644 index 000000000..e3c6c0cd5 --- /dev/null +++ b/docs/index/quickstart.md @@ -0,0 +1,183 @@ + +# Quick Start + + +The following is showing how to run a joint inference job by sedna. +## Quick Start + +#### 0. Check the Environment + +For Sedna all-in-one installation, it requires you: + - 1 VM **(one machine is OK, cluster is not required)** + - 2 CPUs or more + - 2GB+ free memory, depends on node number setting + - 10GB+ free disk space + - Internet connection(docker hub, github etc.) + - Linux platform, such as ubuntu/centos + - Docker 17.06+ + +you can check the docker version by the following command, +```bash +docker -v +``` +after doing that, the output will be like this, that means your version fits the bill. +``` +Docker version 19.03.6, build 369ce74a3c +``` + + +#### 1. Deploy Sedna +Sedna provides three deployment methods, which can be selected according to your actual situation: + +- [Install Sedna AllinOne](setup/all-in-one.md). (used for development, here we use it) +- [Install Sedna local up](setup/local-up.md). +- [Install Sedna on a cluster](setup/install.md). + +The [all-in-one script](/scripts/installation/all-in-one.sh) is used to install Sedna along with a mini Kubernetes environment locally, including: + - A Kubernetes v1.21 cluster with multi worker nodes, default zero worker node. + - KubeEdge with multi edge nodes, default is latest KubeEdge and one edge node. + - Sedna, default is the latest version. + + ```bash + curl https://raw.githubusercontent.com/kubeedge/sedna/master/scripts/installation/all-in-one.sh | NUM_EDGE_NODES=1 bash - + ``` + +Then you get two nodes `sedna-mini-control-plane` and `sedna-mini-edge0`,you can get into each node by following command: + +```bash +# get into cloud node +docker exec -it sedna-mini-control-plane bash +``` + +```bash +# get into edge node +docker exec -it sedna-mini-edge0 bash +``` + +#### 1. Prepare Data and Model File + +* step1: download [little model](https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/little-model.tar.gz) to your edge node. + +``` +mkdir -p /data/little-model +cd /data/little-model +wget https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/little-model.tar.gz +tar -zxvf little-model.tar.gz +``` + +* step2: download [big model](https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/big-model.tar.gz) to your cloud node. + +``` +mkdir -p /data/big-model +cd /data/big-model +wget https://kubeedge.obs.cn-north-1.myhuaweicloud.com/examples/helmet-detection-inference/big-model.tar.gz +tar -zxvf big-model.tar.gz +``` + +#### 2. Create Big Model Resource Object for Cloud +In cloud node: +``` +kubectl create -f - <" - threshold: 500 - metric: num_of_samples -``` - -## API - -- control-plane: Please refer to this [link](api/crd). -- Lib: Please refer to this [link](api/lib). - -## Contributing - -Contributions are very welcome! - -- control-plane: Please refer to this [link](contributing/control-plane/development.md). -- Lib: Please refer to this [link](contributing/lib/development.md). - -## Community - -Sedna is an open source project and in the spirit of openness and freedom, we welcome new contributors to join us. -You can get in touch with the community according to the ways: -* [Github Issues](https://github.com/kubeedge/sedna/issues) -* [Regular Community Meeting](https://zoom.us/j/4167237304) -* [slack channel](https://app.slack.com/client/TDZ5TGXQW/C01EG84REVB/details) - diff --git a/docs/setup/install.md b/docs/setup/install.md index 19607f313..f5c1feb38 100644 --- a/docs/setup/install.md +++ b/docs/setup/install.md @@ -4,7 +4,7 @@ For interested readers, Sedna also has two important components that would be me If you don't have an existing Kubernetes, you can: 1) Install Kubernetes by following the [Kubernetes website](https://kubernetes.io/docs/setup/). -2) Or follow [quick start](quick-start.md) for other options. +2) Or follow [quick start](index/quick-start.md) for other options. ### Prerequisites - [Kubectl][kubectl] with right kubeconfig diff --git a/examples/joint_inference/helmet_detection_inference/helmet_detection_inference.yaml b/examples/joint_inference/helmet_detection_inference/helmet_detection_inference.yaml new file mode 100644 index 000000000..9ecb44301 --- /dev/null +++ b/examples/joint_inference/helmet_detection_inference/helmet_detection_inference.yaml @@ -0,0 +1,66 @@ +apiVersion: sedna.io/v1alpha1 +kind: JointInferenceService +metadata: + name: helmet-detection-inference-example + namespace: default +spec: + edgeWorker: + model: + name: "helmet-detection-inference-little-model" + hardExampleMining: + name: "IBT" + parameters: + - key: "threshold_img" + value: "0.9" + - key: "threshold_box" + value: "0.9" + template: + spec: + nodeName: $EDGE_NODE + containers: + - image: kubeedge/sedna-example-joint-inference-helmet-detection-little:v0.3.0 + imagePullPolicy: IfNotPresent + name: little-model + env: # user defined environments + - name: input_shape + value: "416,736" + - name: "video_url" + value: "rtsp://localhost/video" + - name: "all_examples_inference_output" + value: "/data/output" + - name: "hard_example_cloud_inference_output" + value: "/data/hard_example_cloud_inference_output" + - name: "hard_example_edge_inference_output" + value: "/data/hard_example_edge_inference_output" + resources: # user defined resources + requests: + memory: 64M + cpu: 100m + limits: + memory: 2Gi + volumeMounts: + - name: outputdir + mountPath: /data/ + volumes: # user defined volumes + - name: outputdir + hostPath: + # user must create the directory in host + path: /joint_inference/output + type: Directory + + cloudWorker: + model: + name: "helmet-detection-inference-big-model" + template: + spec: + nodeName: $CLOUD_NODE + containers: + - image: kubeedge/sedna-example-joint-inference-helmet-detection-big:v0.3.0 + name: big-model + imagePullPolicy: IfNotPresent + env: # user defined environments + - name: "input_shape" + value: "544,544" + resources: # user defined resources + requests: + memory: 2Gi From b6444031f5cade25e3c29541de491407ad2ac351 Mon Sep 17 00:00:00 2001 From: llhuii Date: Mon, 13 Dec 2021 15:19:40 +0800 Subject: [PATCH 3/7] Upgrade golang from 1.14 to 1.16 Signed-off-by: llhuii --- .github/workflows/main.yaml | 14 +++++++------- build/gm/Dockerfile | 2 +- build/lc/Dockerfile | 2 +- go.mod | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 736a7b073..e9a403fc1 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -21,7 +21,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - name: Checkout code uses: actions/checkout@v2 @@ -57,7 +57,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -78,7 +78,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -102,7 +102,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -138,7 +138,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -159,7 +159,7 @@ jobs: - name: Install Go uses: actions/setup-go@v2 with: - go-version: 1.14.x + go-version: 1.16.x - uses: actions/cache@v2 with: @@ -171,4 +171,4 @@ jobs: with: fetch-depth: 0 - - run: make docker-cross-build \ No newline at end of file + - run: make docker-cross-build diff --git a/build/gm/Dockerfile b/build/gm/Dockerfile index 5c1b03ce5..3b31fda9f 100644 --- a/build/gm/Dockerfile +++ b/build/gm/Dockerfile @@ -14,7 +14,7 @@ # Add cross buildx improvement # _speed_buildx_for_go_ -FROM golang:1.14-alpine3.11 AS builder +FROM golang:1.16-alpine3.15 AS builder LABEL stage=builder ARG GO_LDFLAGS diff --git a/build/lc/Dockerfile b/build/lc/Dockerfile index e96c429da..71b4f7aae 100644 --- a/build/lc/Dockerfile +++ b/build/lc/Dockerfile @@ -15,7 +15,7 @@ # Add cross buildx improvement # LC has built sqlite3 which requires CGO with CGO_ENABLED=1 # _speed_buildx_for_cgo_alpine_ -FROM golang:1.14-alpine3.11 AS builder +FROM golang:1.16-alpine3.15 AS builder LABEL stage=builder ARG GO_LDFLAGS diff --git a/go.mod b/go.mod index 2f41d6c5d..a1db6d821 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/kubeedge/sedna -go 1.14 +go 1.16 require ( github.com/emicklei/go-restful/v3 v3.4.0 From b2d27074891c0616db7bb1c1e4454942ff5ccc52 Mon Sep 17 00:00:00 2001 From: llhuii Date: Wed, 8 Dec 2021 11:15:45 +0800 Subject: [PATCH 4/7] all-in-one: add edgemesh installation Signed-off-by: llhuii --- scripts/installation/all-in-one.sh | 218 ++++++++++++++++++++++++++--- 1 file changed, 202 insertions(+), 16 deletions(-) diff --git a/scripts/installation/all-in-one.sh b/scripts/installation/all-in-one.sh index 8e352e853..281bc359d 100755 --- a/scripts/installation/all-in-one.sh +++ b/scripts/installation/all-in-one.sh @@ -18,7 +18,7 @@ # - A Kubernetes v1.21 cluster with multi worker nodes, default none worker node. # - KubeEdge with multi nodes, default is latest KubeEdge and one edge node. # - Sedna, default is latest release version. -# +# # It requires you: # - 2 CPUs or more # - 2GB+ free memory, depends on node number setting @@ -76,12 +76,12 @@ function prepare_env() { readonly MAX_EDGE_WORKER_NODES=3 # TODO: find a better way to figure this kind control plane - readonly CONTROL_PLANE_NAME=${CLUSTER_NAME}-control-plane + readonly CONTROL_PLANE_NAME=${CLUSTER_NAME}-control-plane readonly CLOUD_WORKER_NODE_NAME=${CLUSTER_NAME}-worker - # cloudcore default websocket port + # cloudcore default websocket port : ${CLOUDCORE_WS_PORT:=10000} - # cloudcore default cert port + # cloudcore default cert port : ${CLOUDCORE_CERT_PORT:=10002} # for debug purpose @@ -168,7 +168,7 @@ function patch_kindnet() { # which would require KUBERNETES_SERVICE_HOST/KUBERNETES_SERVICE_PORT environment variables. # But edgecore(up to 1.8.0) does not inject these environments. # Here make a patch: can be any value - run_in_control_plane kubectl set env -n kube-system daemonset/kindnet KUBERNETES_SERVICE_HOST=10.96.0.1 KUBERNETES_SERVICE_PORT=443 + run_in_control_plane kubectl set env -n kube-system daemonset/kindnet KUBERNETES_SERVICE_HOST=10.96.0.1 KUBERNETES_SERVICE_PORT=443 } function create_k8s_cluster() { @@ -224,10 +224,15 @@ function setup_cloudcore() { CLOUDCORE_EXPOSED_ADDR=$CLOUDCORE_EXPOSED_IP:$CLOUDCORE_EXPOSED_WS_PORT # keadm accepts version format: 1.8.0 - local version=${KUBEEDGE_VERSION/v} + local version=${KUBEEDGE_VERSION/v} run_in_control_plane bash -euc " # install cloudcore - pgrep cloudcore >/dev/null || keadm init --kubeedge-version=$version --advertise-address=$CLOUDCORE_ADVERTISE_ADDRESSES"' + pgrep cloudcore >/dev/null || { + # keadm 1.8.1 is incompatible with 1.9.1 since crds' upgrade + rm -rf /etc/kubeedge/crds + + keadm init --kubeedge-version=$version --advertise-address=$CLOUDCORE_ADVERTISE_ADDRESSES"' + } # wait token to be created exit_code=1 @@ -248,10 +253,168 @@ function setup_cloudcore() { KUBEEDGE_TOKEN=$(run_in_control_plane keadm gettoken) } +_change_detect_yaml_change() { + # execute the specified yq commands on stdin + # if same, output nothing + # else output the updated yaml + local yq_cmds="${1:-.}" + docker run -i --rm --entrypoint sh mikefarah/yq -c " + yq e . - > a + yq e '$yq_cmds' a > b + cmp -s a b || cat b + " +} + +reconfigure_edgecore() { + # update edgecore.yaml for every edge node + local script_name=reconfigure-edgecore + + if ((NUM_EDGE_NODES<1)); then + return + fi + + local yq_cmds="$1" + + # I want to leverage kubectl but k8s has no ways to run job on each node once + # see https://github.com/kubernetes/kubernetes/issues/64623 for more detais + # So I use Daemonset temporarily + + kubectl apply -f - < a && yq e '$(echo $yq_cmds)' a > b || exit 1 + cmp -s a b && echo No need to reconfigure \$NODE_NAME edgecore || { + # backup and overwrite config, kill edgecore and wait systemd restart it + cp /config/edgecore.yaml /config/edgecore.yaml.reconfigure_bk + cp b /config/edgecore.yaml + + pkill edgecore + + # check to edgecore process status + > pids + for i in 0 1 2 3; do + sleep 10 + { pidof edgecore || echo =\$i; } >> pids + done + [ \$(sort -u pids | wc -l) -le 2 ] && echo Reconfigure \$NODE_NAME edgecore successfully || { + echo Failed to reconfigure \$NODE_NAME edgecore >&2 + echo And recovery edgecore config yaml >&2 + cp a /config/edgecore.yaml + + # prevent daemonset execute this script too frequently + sleep 1800 + exit 1 + } + } + sleep inf +EOF + + # wait this script been executed + kubectl -n kubeedge rollout status --timeout=5m ds $script_name + # wait all edge nodes to be ready if restarted + kubectl wait --for=condition=ready node -l node-role.kubernetes.io/edge= -function setup_edgemesh() { - # TODO: wait for edgemesh one line installer - : + # keep this daemonset script for debugging + # kubectl -n kubeedge delete ds $script_name + +} + +reconfigure_cloudcore() { + + local config_file=/etc/kubeedge/config/cloudcore.yaml + local yq_cmds=$1 + + run_in_control_plane cat $config_file | + _change_detect_yaml_change "$yq_cmds" | + run_in_control_plane bash -euc " + cat > cc.yaml + ! grep -q . cc.yaml || { + echo reconfigure and restart cloudcore + cp $config_file ${config_file}.reconfigure_bk + cp cc.yaml $config_file + pkill cloudcore || true + # TODO: use a systemd service + (cloudcore &>> /var/log/kubeedge/cloudcore.log &) + } + + " + echo Reconfigure cloudcore successfully +} + +function install_edgemesh() { + if ((NUM_EDGE_NODES<1)); then + # no edge node, no edgemesh + return + fi + + local server_node_name + if ((NUM_CLOUD_WORKER_NODES>0)); then + server_node_name=${CLUSTER_NAME}-worker + else + server_node_name=${CLUSTER_NAME}-control-plane + fi + + echo Installing edgemesh with server on $server_node_name + # enable Local APIServer + reconfigure_cloudcore '.modules.dynamicController.enable=true' + + reconfigure_edgecore ' + .modules.edged.clusterDNS="169.254.96.16" + | .modules.edged.clusterDomain="cluster.local" + | .modules.metaManager.metaServer.enable=true + ' + + # no server.publicIP + # since allinone is in flat network, we just use private ip for edgemesh server + helm upgrade --install edgemesh \ + --set server.nodeName=$server_node_name \ + https://raw.githubusercontent.com/kubeedge/edgemesh/main/build/helm/edgemesh.tgz + + echo Install edgemesh successfully } function gen_cni_config() { @@ -365,12 +528,24 @@ function create_and_setup_edgenodes() { " # fix cni config file gen_cni_config | docker exec -i $containername tee /etc/cni/net.d/10-edgecni.conflist >/dev/null - + + { + # wait edge node to be created at background + nwait=20 + for((i=0;i/dev/null && break + sleep 3 + done + } & + done + # wait all edge nodes to be created + wait + } function clean_edgenodes() { - for cid in $(docker ps -a --filter label=sedna.io=sedna-mini-edge -q); do + for cid in $(docker ps -a --filter label=sedna.io=sedna-mini-edge -q); do docker stop $cid; docker rm $cid done } @@ -387,8 +562,6 @@ function setup_cloud() { setup_control_kubeconfig setup_cloudcore - - setup_edgemesh } function clean_cloud() { @@ -434,7 +607,7 @@ function get_latest_version() { # ... # "tag_name": "v1.0.0", # ... - + # Sometimes this will reach rate limit # https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting local url=https://api.github.com/repos/$repo/releases/latest @@ -458,7 +631,7 @@ function arch() { function _download_tool() { local name=$1 url=$2 - local file=/usr/local/bin/$name + local file=/usr/local/bin/$name curl -Lo $file $url chmod +x $file } @@ -488,9 +661,17 @@ function ensure_kubectl() { ensure_tool kubectl https://dl.k8s.io/release/v${version/v}/bin/linux/$(arch)/kubectl } +function ensure_helm() { + if check_command_exists helm; then + return + fi + curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash +} + function ensure_tools() { ensure_kind ensure_kubectl + ensure_helm } function main() { @@ -502,6 +683,11 @@ function main() { create) setup_cloud setup_edge + # wait all nodes to be ready + kubectl wait --for=condition=ready node --all + + # edgemesh need to be installed before sedna + install_edgemesh install_sedna log_info "Mini Sedna is created successfully" ;; From 226ee6ee10d21cc0ad84cba8d0eeea49e36f02a0 Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Tue, 4 Jan 2022 19:41:50 +0800 Subject: [PATCH 5/7] LL: add support of multi-node train/eval/infer Signed-off-by: JimmyYang20 --- .../lifelonglearning/downstream.go | 97 ++- .../lifelonglearning/lifelonglearningjob.go | 86 +- .../lifelonglearning/lifelonglearningjob.go | 753 ++++++++++-------- 3 files changed, 576 insertions(+), 360 deletions(-) diff --git a/pkg/globalmanager/controllers/lifelonglearning/downstream.go b/pkg/globalmanager/controllers/lifelonglearning/downstream.go index 652bc79b1..eee7a306c 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/downstream.go +++ b/pkg/globalmanager/controllers/lifelonglearning/downstream.go @@ -17,9 +17,12 @@ limitations under the License. package lifelonglearning import ( - "fmt" + "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/globalmanager/runtime" @@ -36,17 +39,95 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro // more details at https://github.com/kubernetes/kubernetes/issues/3030 job.Kind = KindName - // Here only propagate to the nodes with non empty name + dataName := job.Spec.Dataset.Name + // LC has dataset object on this node that may call dataset node + var dsNodeName string + ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err) + } else { + dsNodeName = ds.Spec.NodeName + } + + var trainNodeName string + var evalNodeName string + var deployNodeName string - // FIXME(llhuii): only the case that all workers having the same nodeName are support, - // will support Spec.NodeSelector and different nodeName. - nodeName := job.Spec.TrainSpec.Template.Spec.NodeName - if len(nodeName) == 0 { - return fmt.Errorf("empty node name") + getAnnotationsNodeName := func(nodeName sednav1.LLJobStage) string { + return runtime.AnnotationsKeyPrefix + string(nodeName) + } + ann := job.GetAnnotations() + if ann != nil { + trainNodeName = ann[getAnnotationsNodeName(sednav1.LLJobTrain)] + evalNodeName = ann[getAnnotationsNodeName(sednav1.LLJobEval)] + deployNodeName = ann[getAnnotationsNodeName(sednav1.LLJobDeploy)] + } + + if eventType == watch.Deleted { + // delete jobs from all LCs + nodes := sets.NewString(dsNodeName, trainNodeName, evalNodeName, deployNodeName) + + for node := range nodes { + c.sendToEdgeFunc(node, eventType, job) + } + + return nil + } + + if dsNodeName == "" { + return nil + } + + jobConditions := job.Status.Conditions + if len(jobConditions) == 0 { + return nil + } + + latestCondition := jobConditions[len(jobConditions)-1] + currentType := latestCondition.Type + jobStage := latestCondition.Stage + + syncJobWithNodeName := func(nodeName string) { + if err := c.sendToEdgeFunc(nodeName, eventType, job); err != nil { + klog.Warningf("Error to sync lifelong learning job %s to node %s in stage %s: %v", + job.Name, nodeName, jobStage, err) + } } runtime.InjectSecretAnnotations(c.kubeClient, job, job.Spec.CredentialName) - return c.sendToEdgeFunc(nodeName, eventType, job) + + // isJobResidentNode checks whether nodeName is a job resident node + isJobResidentNode := func(nodeName string) bool { + // the node where LC monitors dataset and the node where inference worker is running are job resident node + if nodeName == dsNodeName || nodeName == deployNodeName { + return true + } + return false + } + + doJobStageEvent := func(nodeName string) { + if currentType == sednav1.LLJobStageCondWaiting { + syncJobWithNodeName(dsNodeName) + } else if currentType == sednav1.LLJobStageCondRunning { + syncJobWithNodeName(nodeName) + } else if currentType == sednav1.LLJobStageCondCompleted || currentType == sednav1.LLJobStageCondFailed { + if !isJobResidentNode(nodeName) { + // delete LC's job from nodeName that's different from dataset node when worker's status is completed or failed. + c.sendToEdgeFunc(nodeName, watch.Deleted, job) + } + } + } + + switch jobStage { + case sednav1.LLJobTrain: + doJobStageEvent(trainNodeName) + case sednav1.LLJobEval: + doJobStageEvent(evalNodeName) + case sednav1.LLJobDeploy: + doJobStageEvent(deployNodeName) + } + + return nil } func (c *Controller) SetDownstreamSendFunc(f runtime.DownstreamSendFunc) error { diff --git a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go index 4c3a2f690..b2ed1e1c8 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go @@ -18,7 +18,9 @@ package lifelonglearning import ( "context" + "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/types" "strings" "time" @@ -294,10 +296,51 @@ func (c *Controller) sync(key string) (bool, error) { return forget, err } +// setWorkerNodeNameOfJob sets the worker nodeName of the specified job +// which is used for downstream to sync job info to the specified LC located in nodeName. +func (c *Controller) setWorkerNodeNameOfJob(job *sednav1.LifelongLearningJob, jobStage string, nodeName string) error { + key := runtime.AnnotationsKeyPrefix + jobStage + + return c.addJobAnnotations(job, key, nodeName) +} + +// addJobAnnotations adds info in job annotations +func (c *Controller) addJobAnnotations(job *sednav1.LifelongLearningJob, key string, value string) error { + ann := job.GetAnnotations() + if ann[key] == value { + // already set + return nil + } + + patchData := metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{key: value}}} + + patchDataBytes, err := json.Marshal(&patchData) + if err != nil { + return err + } + + jobClient := c.client.LifelongLearningJobs(job.Namespace) + return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error { + newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + annotations := newJob.GetAnnotations() + if annotations[key] == value { + return nil + } + + _, err = jobClient.Patch(context.TODO(), job.Name, types.MergePatchType, patchDataBytes, metav1.PatchOptions{}) + return err + }) +} + // transitJobState transit job to next state func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, error) { var initialType sednav1.LLJobStageConditionType - var latestCondition = sednav1.LLJobCondition{ + var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{ Stage: sednav1.LLJobTrain, Type: initialType, } @@ -305,14 +348,16 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er var newConditionType sednav1.LLJobStageConditionType var needUpdated = false - var podStatus = v1.PodUnknown + var podStatus v1.PodPhase = v1.PodUnknown + var pod *v1.Pod + jobConditions := job.Status.Conditions if len(jobConditions) > 0 { // get latest pod and pod status latestCondition = (jobConditions)[len(jobConditions)-1] klog.V(2).Infof("lifelonglearning job %v/%v latest stage %v:", job.Namespace, job.Name, latestCondition.Stage) - pod := c.getSpecifiedPods(job, string(latestCondition.Stage)) + pod = c.getSpecifiedPods(job, string(latestCondition.Stage)) if pod != nil { podStatus = pod.Status.Phase @@ -337,25 +382,30 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er err = c.restartInferPod(job) if err != nil { klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) - } else { - klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) + return needUpdated, err } - } else if podStatus != v1.PodPending && podStatus != v1.PodRunning { - err = c.createPod(job, jobStage) - } - if err != nil { - return needUpdated, err + + klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) + newConditionType = sednav1.LLJobStageCondCompleted + } else { + if podStatus != v1.PodPending && podStatus != v1.PodRunning { + err = c.createPod(job, jobStage) + if err != nil { + return needUpdated, err + } + } + newConditionType = sednav1.LLJobStageCondStarting } - newConditionType = sednav1.LLJobStageCondStarting case sednav1.LLJobStageCondStarting, sednav1.LLJobStageCondRunning: if podStatus == v1.PodRunning { - if jobStage == sednav1.LLJobDeploy { - newConditionType = sednav1.LLJobStageCondCompleted - } else { - // watch pod status, if pod running, set type running - newConditionType = sednav1.LLJobStageCondRunning + // add nodeName to job + if err := c.setWorkerNodeNameOfJob(job, string(jobStage), pod.Spec.NodeName); err != nil { + return needUpdated, err } + + // watch pod status, if pod running, set type running + newConditionType = sednav1.LLJobStageCondRunning } else if podStatus == v1.PodSucceeded { // watch pod status, if pod completed, set type completed newConditionType = sednav1.LLJobStageCondCompleted @@ -541,7 +591,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 originalDataURLOrIndex = dataset.Spec.URL } - var workerParam = new(runtime.WorkerParam) + var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) if podtype == sednav1.LLJobTrain { workerParam.WorkerType = "Train" @@ -672,7 +722,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error { return err } - var workerParam = new(runtime.WorkerParam) + var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{ URL: &runtime.MountURL{ diff --git a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go index 114ccff0e..05337b2c3 100644 --- a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go @@ -27,10 +27,12 @@ import ( "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "github.com/kubeedge/sedna/cmd/sedna-lc/app/options" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" + gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/lifelonglearning" "github.com/kubeedge/sedna/pkg/globalmanager/runtime" "github.com/kubeedge/sedna/pkg/localcontroller/db" clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" @@ -42,20 +44,23 @@ import ( ) const ( + // JobIterationIntervalSeconds is interval time of each iteration of job + JobIterationIntervalSeconds = 10 + // DatasetHandlerIntervalSeconds is interval time of handling dataset + DatasetHandlerIntervalSeconds = 10 + // EvalSamplesCapacity is capacity of eval samples + EvalSamplesCapacity = 5 //KindName is kind of lifelong-learning-job resource KindName = "lifelonglearningjob" - // TrainPhase is the train phase - TrainPhase = "train" - // EvalPhase is the eval phase - EvalPhase = "eval" - // DeployPhase is the deploy phase - DeployPhase = "deploy" - // TriggerReadyStatus is the ready status about trigger TriggerReadyStatus = "ready" // TriggerCompletedStatus is the completed status about trigger TriggerCompletedStatus = "completed" + + AnnotationsRoundsKey = "sedna.io/rounds" + AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples" + AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval" ) // LifelongLearningJobManager defines lifelong-learning-job Manager @@ -70,58 +75,47 @@ type Manager struct { // LifelongLearningJob defines config for lifelong-learning-job type Job struct { sednav1.LifelongLearningJob - Dataset *dataset.Dataset - Done chan struct{} - Storage storage.Storage - JobConfig *LLJobConfig -} - -// LLJobConfig defines config for lifelong-learning-job -type LLJobConfig struct { - UniqueIdentifier string - Version int - Phase string - WorkerStatus string - TrainTrigger trigger.Base - TriggerStatus string - TriggerTime time.Time - TrainDataURL string - EvalDataURL string - OutputDir string - OutputConfig *LLOutputConfig - DataSamples *LLDataSamples - TrainModel *Model - DeployModel *Model - EvalResult *Model - Lock sync.Mutex + JobConfig *JobConfig +} + +// JobConfig defines config for lifelong-learning-job +type JobConfig struct { + UniqueIdentifier string + Rounds int + TrainTrigger trigger.Base + TriggerTime time.Time + TrainTriggerStatus string + EvalTriggerStatus string + DeployTriggerStatus string + TrainDataURL string + EvalDataURL string + OutputDir string + OutputConfig *OutputConfig + DataSamples *DataSamples + DeployModel *Model + Lock sync.Mutex + Dataset *dataset.Dataset + Storage storage.Storage + Done chan struct{} } type Model = clienttypes.Model -// LLOutputConfig defines config for job output -type LLOutputConfig struct { - SamplesOutput map[string]string - TrainOutput string - EvalOutput string +// OutputConfig defines config for job output +type OutputConfig struct { + SamplesOutput map[string]string `json:"trainData"` + TrainOutput string `json:"trainOutput"` + EvalOutput string `json:"evalOutput"` } -// LLDataSamples defines samples information -type LLDataSamples struct { - Numbers int +// DataSamples defines samples information +type DataSamples struct { + PreviousNumbers int TrainSamples []string EvalVersionSamples [][]string EvalSamples []string } -const ( - // LLJobIterationIntervalSeconds is interval time of each iteration of job - LLJobIterationIntervalSeconds = 10 - // LLHandlerDataIntervalSeconds is interval time of handling dataset - LLHandlerDataIntervalSeconds = 10 - // LLLLEvalSamplesCapacity is capacity of eval samples - LLEvalSamplesCapacity = 5 -) - // New creates a lifelong-learning-job manager func New(client clienttypes.ClientI, datasetManager *dataset.Manager, options *options.LocalControllerOptions) *Manager { lm := Manager{ @@ -143,8 +137,6 @@ func (lm *Manager) Insert(message *clienttypes.Message) error { job, ok := lm.LifelongLearningJobMap[name] if !ok { job = &Job{} - job.Storage = storage.Storage{IsLocalStorage: false} - job.Done = make(chan struct{}) lm.LifelongLearningJobMap[name] = job first = true } @@ -153,21 +145,14 @@ func (lm *Manager) Insert(message *clienttypes.Message) error { return err } - credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] - if credential != "" { - if err := job.Storage.SetCredential(credential); err != nil { - return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err) - } + if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil { + return err } if first { go lm.startJob(name) } - if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil { - return err - } - return nil } @@ -179,51 +164,43 @@ func (lm *Manager) startJob(name string) { return } - job.JobConfig = new(LLJobConfig) - jobConfig := job.JobConfig - jobConfig.UniqueIdentifier = name - - err = lm.initJob(job) + err = lm.initJob(job, name) if err != nil { - klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier) + klog.Errorf("failed to init job (name=%s): %+v", name) return } klog.Infof("lifelong learning job(name=%s) is started", name) defer klog.Infof("lifelong learning job(name=%s) is stopped", name) + + // handle data from dataset go lm.handleData(job) - tick := time.NewTicker(LLJobIterationIntervalSeconds * time.Second) + tick := time.NewTicker(JobIterationIntervalSeconds * time.Second) for { select { - case <-job.Done: + case <-job.JobConfig.Done: return default: } - if job.Dataset == nil { - klog.V(3).Infof("job(name=%s) dataset not ready", - jobConfig.UniqueIdentifier) - - <-tick.C - continue - } + cond := lm.getLatestCondition(job) + jobStage := cond.Stage - switch jobConfig.Phase { - case TrainPhase: + switch jobStage { + case sednav1.LLJobTrain: err = lm.trainTask(job) - case EvalPhase: + case sednav1.LLJobEval: err = lm.evalTask(job) - case DeployPhase: + case sednav1.LLJobDeploy: err = lm.deployTask(job) default: - klog.Errorf("invalid phase: %s", jobConfig.Phase) + klog.Errorf("invalid phase: %s", jobStage) continue } if err != nil { - klog.Errorf("job(name=%s) complete the %s task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) + klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err) } <-tick.C @@ -234,40 +211,56 @@ func (lm *Manager) startJob(name string) { func (lm *Manager) trainTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - payload, ok, err := lm.triggerTrainTask(job) - if !ok { - return nil - } + latestCond := lm.getLatestCondition(job) + jobStage := latestCond.Stage + currentType := latestCond.Type - if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) - return err + if currentType == sednav1.LLJobStageCondWaiting { + err := lm.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - err = lm.Client.WriteMessage(payload, job.getHeader()) - if err != nil { - klog.Errorf("job(name=%s) failed to write message: %v", - jobConfig.UniqueIdentifier, err) - return err + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier) } - jobConfig.TriggerStatus = TriggerCompletedStatus + initTriggerStatus(jobConfig) - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobConfig.Phase) - } + if jobConfig.TrainTriggerStatus == TriggerReadyStatus { + payload, ok, err := lm.triggerTrainTask(job) + if !ok { + return nil + } - if jobConfig.WorkerStatus == workertypes.FailedStatus { - klog.Warningf("found the %sing phase worker that ran failed, "+ - "back the training phase triggering task", jobConfig.Phase) - backLLTaskStatus(jobConfig) - } + if err != nil { + klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v", + jobConfig.UniqueIdentifier, jobStage, err) + job.JobConfig.Rounds-- + return err + } + + err = lm.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + job.JobConfig.Rounds-- + return err + } + + forwardSamples(jobConfig, jobStage) - if jobConfig.WorkerStatus == workertypes.CompletedStatus { - klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) - nextLLTask(jobConfig) + err = lm.saveJobToDB(job) + if err != nil { + klog.Errorf("job(%s) failed to save job to db: %v", + jobConfig.UniqueIdentifier, err) + // continue anyway + } + + jobConfig.TrainTriggerStatus = TriggerCompletedStatus + klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -277,35 +270,37 @@ func (lm *Manager) trainTask(job *Job) error { func (lm *Manager) evalTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - payload, err := lm.triggerEvalTask(job) - if err != nil { - klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", - jobConfig.UniqueIdentifier, jobConfig.Phase, err) - return err - } + latestCond := lm.getLatestCondition(job) + jobStage := latestCond.Stage + currentType := latestCond.Type - err = lm.Client.WriteMessage(payload, job.getHeader()) - if err != nil { - return err + if currentType == sednav1.LLJobStageCondWaiting { + err := lm.loadDataset(job) + if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w", + jobConfig.UniqueIdentifier, err) } - jobConfig.TriggerStatus = TriggerCompletedStatus + if jobConfig.EvalTriggerStatus == TriggerReadyStatus { + payload, err := lm.triggerEvalTask(job) + if err != nil { + klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v", + jobConfig.UniqueIdentifier, jobStage, err) + return err + } - klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", - jobConfig.UniqueIdentifier, jobConfig.Phase) - } + err = lm.Client.WriteMessage(payload, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err) + return err + } - if jobConfig.WorkerStatus == workertypes.FailedStatus { - msg := fmt.Sprintf("job(name=%s) found the %sing phase worker that ran failed, "+ - "back the training phase triggering task", jobConfig.UniqueIdentifier, jobConfig.Phase) - klog.Errorf(msg) - return fmt.Errorf(msg) - } + forwardSamples(jobConfig, jobStage) - if jobConfig.WorkerStatus == workertypes.CompletedStatus { - klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) - nextLLTask(jobConfig) + jobConfig.EvalTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %sing phase triggering task successfully", + jobConfig.UniqueIdentifier, jobStage) + } } return nil @@ -313,39 +308,37 @@ func (lm *Manager) evalTask(job *Job) error { // deployTask starts deploy task func (lm *Manager) deployTask(job *Job) error { - jobConfig := job.JobConfig + if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus { + jobConfig := job.JobConfig + var err error + + status := clienttypes.UpstreamMessage{Phase: string(sednav1.LLJobDeploy)} + models := lm.getJobStageModel(job, sednav1.LLJobDeploy) + if models != nil { + err = lm.updateDeployModelFile(job, models[0].URL, jobConfig.DeployModel.URL) + if err != nil { + status.Status = string(sednav1.LLJobStageCondFailed) + klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err) + return err + } - if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - status := clienttypes.UpstreamMessage{} - status.Phase = DeployPhase - deployModel, err := lm.deployModel(job) - if err != nil { - klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err) + status.Status = string(sednav1.LLJobStageCondReady) + status.Input = &clienttypes.Input{Models: []Model{{Format: models[0].Format, URL: models[0].URL}}} } else { - klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier) - } - if err != nil || deployModel == nil { - status.Status = workertypes.FailedStatus - } else { - status.Status = workertypes.ReadyStatus - status.Input = &clienttypes.Input{ - Models: []Model{ - *deployModel, - }, - } + klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier) + status.Status = string(sednav1.LLJobStageCondCompleted) } - if err = lm.Client.WriteMessage(status, job.getHeader()); err != nil { + err = lm.Client.WriteMessage(status, job.getHeader()) + if err != nil { + klog.Errorf("job(%s) completed the %s task failed: %v", + jobConfig.UniqueIdentifier, sednav1.LLJobDeploy, err) return err } - jobConfig.TriggerStatus = TriggerCompletedStatus + job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus + klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.LLJobDeploy) } - - nextLLTask(jobConfig) - - klog.Infof("job(name=%s) complete the deploy task successfully", jobConfig.UniqueIdentifier) - return nil } @@ -365,19 +358,22 @@ func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { return nil, false, nil } - jobConfig.Version++ + job.JobConfig.Rounds++ + rounds := jobConfig.Rounds var dataIndexURL string - jobConfig.TrainDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.TrainSamples, - jobConfig.OutputConfig.SamplesOutput["train"]) + jobConfig.TrainDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.TrainSamples, + jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("train phase: write samples to the file(%s) is failed, error: %v", jobConfig.TrainDataURL, err) + job.JobConfig.Rounds-- + klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v", + jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err) return nil, false, err } dataURL := jobConfig.TrainDataURL - outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Version)}, "/") - if job.Storage.IsLocalStorage { + outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/") + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) @@ -389,10 +385,11 @@ func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { OutputDir: outputDir, } msg := clienttypes.UpstreamMessage{ - Phase: TrainPhase, - Status: workertypes.ReadyStatus, + Phase: string(sednav1.LLJobTrain), + Status: string(sednav1.LLJobStageCondReady), Input: &input, } + jobConfig.TriggerTime = time.Now() return &msg, true, nil } @@ -402,89 +399,83 @@ func (lm *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, erro jobConfig := job.JobConfig var err error + latestCondition := lm.getLatestCondition(job) + + ms := lm.getJobStageModel(job, latestCondition.Stage) + if ms == nil { + return nil, err + } + var dataIndexURL string - jobConfig.EvalDataURL, dataIndexURL, err = job.writeLLJSamples(jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"]) + jobConfig.EvalDataURL, dataIndexURL, err = lm.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"], + job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix) if err != nil { - klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v", + klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v", jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err) return nil, err } - var models []Model - models = append(models, Model{ - Format: jobConfig.TrainModel.Format, - URL: jobConfig.TrainModel.URL, - }) - dataURL := jobConfig.EvalDataURL - outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Version)}, "/") - if job.Storage.IsLocalStorage { + outputDir := strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds)}, "/") + if jobConfig.Storage.IsLocalStorage { dataURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataURL) dataIndexURL = util.TrimPrefixPath(lm.VolumeMountPrefix, dataIndexURL) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) } input := clienttypes.Input{ - Models: models, + Models: ms, DataURL: dataURL, DataIndexURL: dataIndexURL, OutputDir: outputDir, } msg := &clienttypes.UpstreamMessage{ - Phase: EvalPhase, - Status: workertypes.ReadyStatus, + Phase: string(sednav1.LLJobEval), + Status: string(sednav1.LLJobStageCondReady), Input: &input, } return msg, nil } -// deployModel deploys model -func (lm *Manager) deployModel(job *Job) (*Model, error) { - jobConfig := job.JobConfig - - model := &Model{} - model = jobConfig.EvalResult - - if job.Storage.IsLocalStorage { - model.URL = util.AddPrefixPath(lm.VolumeMountPrefix, model.URL) +// updateDeployModelFile updates deploy model file +func (lm *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error { + if job.JobConfig.Storage.IsLocalStorage { + trainedModel = util.AddPrefixPath(lm.VolumeMountPrefix, trainedModel) } - deployModelURL := jobConfig.DeployModel.URL - if err := job.Storage.CopyFile(model.URL, deployModelURL); err != nil { - return nil, fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v", - model.URL, deployModelURL, err) + if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil { + return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w", + trainedModel, deployModel, err) } - klog.V(4).Infof("copy model(url=%s) to the deploy model(url=%s) successfully", model.URL, deployModelURL) - klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, model.URL) + klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel) - return model, nil + return nil } // createOutputDir creates the job output dir -func (job *Job) createOutputDir(jobConfig *LLJobConfig) error { +func (job *Job) createOutputDir(jobConfig *JobConfig) error { outputDir := jobConfig.OutputDir dirNames := []string{"data/train", "data/eval", "train", "eval"} - // lifelong_kb_index.pkl - if job.Storage.IsLocalStorage { + if job.JobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(outputDir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir) + klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, outputDir, err) return err } for _, v := range dirNames { dir := path.Join(outputDir, v) if err := util.CreateFolder(dir); err != nil { - klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir) + klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, dir, err) return err } } } - outputConfig := LLOutputConfig{ + outputConfig := OutputConfig{ SamplesOutput: map[string]string{ "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"), "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"), @@ -497,8 +488,18 @@ func (job *Job) createOutputDir(jobConfig *LLJobConfig) error { return nil } +func (lm *Manager) getLatestCondition(job *Job) sednav1.LLJobCondition { + jobConditions := job.Status.Conditions + var latestCondition sednav1.LLJobCondition = sednav1.LLJobCondition{} + if len(jobConditions) > 0 { + // get latest pod and pod status + latestCondition = jobConditions[len(jobConditions)-1] + } + return latestCondition +} + // createFile creates data file and data index file -func (job *Job) createFile(dir string, format string, isLocalStorage bool) (string, string) { +func createFile(dir string, format string, isLocalStorage bool) (string, string) { switch strings.ToLower(format) { case dataset.TXTFormat: if isLocalStorage { @@ -512,16 +513,17 @@ func (job *Job) createFile(dir string, format string, isLocalStorage bool) (stri return "", "" } -// writeLLJSamples writes samples information to a file -func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, error) { - version := job.JobConfig.Version - format := job.Dataset.Spec.Format - urlPrefix := job.Dataset.URLPrefix +// writeSamples writes samples information to a file +func (lm *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) { + if samples == nil { + return "", "", fmt.Errorf("not samples") + } - subDir := strings.Join([]string{dir, strconv.Itoa(version)}, "/") - fileURL, absURLFile := job.createFile(subDir, format, job.Dataset.Storage.IsLocalStorage) + jobConfig := job.JobConfig + subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/") + fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage) - if job.Storage.IsLocalStorage { + if jobConfig.Storage.IsLocalStorage { if err := util.CreateFolder(subDir); err != nil { return "", "", err } @@ -529,7 +531,7 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - if !job.Dataset.Storage.IsLocalStorage && absURLFile != "" { + if !jobConfig.Dataset.Storage.IsLocalStorage && absURLFile != "" { tempSamples := util.ParsingDatasetIndex(samples, urlPrefix) if err := job.writeByLine(tempSamples, absURLFile, format); err != nil { return "", "", err @@ -544,13 +546,13 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - localFileURL, localAbsURLFile := job.createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage) + localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage) if err := job.writeByLine(samples, localFileURL, format); err != nil { return "", "", err } - if err := job.Storage.Upload(localFileURL, fileURL); err != nil { + if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil { return "", "", err } @@ -561,7 +563,7 @@ func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, e return "", "", err } - if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil { + if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil { return "", "", err } @@ -584,7 +586,7 @@ func (job *Job) writeByLine(samples []string, fileURL string, format string) err w := bufio.NewWriter(file) if format == "csv" { - _, _ = fmt.Fprintln(w, job.Dataset.DataSource.Header) + _, _ = fmt.Fprintln(w, job.JobConfig.Dataset.DataSource.Header) } for _, line := range samples { @@ -605,62 +607,61 @@ func (job *Job) writeByLine(samples []string, fileURL string, format string) err // handleData updates samples information func (lm *Manager) handleData(job *Job) { - tick := time.NewTicker(LLHandlerDataIntervalSeconds * time.Second) + tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second) jobConfig := job.JobConfig iterCount := 0 for { select { - case <-job.Done: + case <-jobConfig.Done: return default: } - // in case dataset is not synced to LC before job synced to LC - // here call loadDataset in each period - err := lm.loadDataset(job) if iterCount%100 == 0 { - klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier) + klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier) } iterCount++ - if err != nil { - klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v", - jobConfig.UniqueIdentifier, - err) + + if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil { + // already loaded dataset <-tick.C continue } - dataset := job.Dataset + dataset := jobConfig.Dataset + currentNumberOfSamples := dataset.DataSource.NumberOfSamples + previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers - if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers { + if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples { samples := dataset.DataSource.TrainSamples - trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers)) + newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples + trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples)) jobConfig.Lock.Lock() jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples, - samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...) - klog.Infof("job(name=%s) current train samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples)) + samples[previousNumberOfSamples:previousNumberOfSamples+trainNum]...) + klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum) jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples, - samples[(jobConfig.DataSamples.Numbers+trainNum+1):]) + samples[previousNumberOfSamples+trainNum:]) jobConfig.Lock.Unlock() for _, v := range jobConfig.DataSamples.EvalVersionSamples { jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...) } - klog.Infof("job(name=%s) current eval samples nums is %d", - jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples)) + klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples)) - jobConfig.DataSamples.Numbers = len(samples) + jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples } + <-tick.C } } +// loadDataset loads dataset information func (lm *Manager) loadDataset(job *Job) error { - if job.Dataset != nil { + if job.JobConfig.Dataset != nil { // already loaded return nil } @@ -671,30 +672,37 @@ func (lm *Manager) loadDataset(job *Job) error { return fmt.Errorf("not exists dataset(name=%s)", datasetName) } - jobConfig := job.JobConfig - jobConfig.DataSamples = &LLDataSamples{ - Numbers: 0, - TrainSamples: make([]string, 0), - EvalVersionSamples: make([][]string, 0), - EvalSamples: make([]string, 0), - } - - job.Dataset = dataset + job.JobConfig.Dataset = dataset return nil } // initJob inits the job object -func (lm *Manager) initJob(job *Job) error { +func (lm *Manager) initJob(job *Job, name string) error { + job.JobConfig = new(JobConfig) + jobConfig := job.JobConfig - jobConfig.TrainModel = new(Model) - jobConfig.EvalResult = new(Model) + jobConfig.UniqueIdentifier = name + + jobConfig.Storage = storage.Storage{IsLocalStorage: false} + credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] + if credential != "" { + if err := job.JobConfig.Storage.SetCredential(credential); err != nil { + return fmt.Errorf("failed to set storage credential: %w", err) + } + } + + jobConfig.Done = make(chan struct{}) jobConfig.Lock = sync.Mutex{} + jobConfig.Rounds = 0 + + jobConfig.DataSamples = &DataSamples{ + PreviousNumbers: 0, + TrainSamples: make([]string, 0), + EvalVersionSamples: make([][]string, 0), + EvalSamples: make([]string, 0), + } - jobConfig.Version = 0 - jobConfig.Phase = TrainPhase - jobConfig.WorkerStatus = workertypes.ReadyStatus - jobConfig.TriggerStatus = TriggerReadyStatus - trainTrigger, err := newLLTrigger(job.Spec.TrainSpec.Trigger) + trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger) if err != nil { return fmt.Errorf("failed to init train trigger: %+w", err) } @@ -702,13 +710,13 @@ func (lm *Manager) initJob(job *Job) error { outputDir := job.Spec.OutputDir - isLocalURL, err := job.Storage.IsLocalURL(outputDir) + isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir) if err != nil { - return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir) + return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", job.Name, outputDir, err) } if isLocalURL { - job.Storage.IsLocalStorage = true + jobConfig.Storage.IsLocalStorage = true outputDir = util.AddPrefixPath(lm.VolumeMountPrefix, outputDir) } @@ -723,10 +731,22 @@ func (lm *Manager) initJob(job *Job) error { URL: strings.Join([]string{strings.TrimRight(outputDir, "/"), "deploy/index.pkl"}, "/"), } + if err := lm.updateJobFromDB(job); err != nil { + klog.Errorf("job(%s) failed to update job from db: %v", name, err) + } + + initTriggerStatus(jobConfig) + return nil } -func newLLTrigger(t sednav1.LLTrigger) (trigger.Base, error) { +func initTriggerStatus(jobConfig *JobConfig) { + jobConfig.TrainTriggerStatus = TriggerReadyStatus + jobConfig.EvalTriggerStatus = TriggerReadyStatus + jobConfig.DeployTriggerStatus = TriggerReadyStatus +} + +func newTrigger(t sednav1.LLTrigger) (trigger.Base, error) { // convert trigger to map triggerMap := make(map[string]interface{}) c, err := json.Marshal(t) @@ -741,55 +761,83 @@ func newLLTrigger(t sednav1.LLTrigger) (trigger.Base, error) { return trigger.NewTrigger(triggerMap) } -// forwardSamplesLL deletes the samples information in the memory -func forwardSamplesLL(jobConfig *LLJobConfig) { - switch jobConfig.Phase { - case TrainPhase: - { - jobConfig.Lock.Lock() - jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0] - jobConfig.Lock.Unlock() - } - case EvalPhase: - { - if len(jobConfig.DataSamples.EvalVersionSamples) > LLEvalSamplesCapacity { - jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:] +// getModelsFromJobConditions gets models from job condition +func (lm *Manager) getModelsFromJobConditions(jobConditions []sednav1.LLJobCondition, stage sednav1.LLJobStage, currentType sednav1.LLJobStageConditionType, dataType string) []Model { + // TODO: runtime.type changes to common.type for gm and lc + for i := len(jobConditions) - 1; i >= 0; i-- { + var cond gmtypes.ConditionData + jobCond := jobConditions[i] + if jobCond.Stage == stage && jobCond.Type == currentType { + if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil { + continue + } + + if dataType == "input" { + if cond.Input == nil { + continue + } + + return cond.Input.Models + } else if dataType == "output" { + if cond.Output == nil { + continue + } + + return cond.Output.Models } } } -} -// backLLTaskStatus backs train task status -func backLLTaskStatus(jobConfig *LLJobConfig) { - jobConfig.Phase = TrainPhase - initLLTaskStatus(jobConfig) + return nil } -// initLLTaskStatus inits task status -func initLLTaskStatus(jobConfig *LLJobConfig) { - jobConfig.WorkerStatus = workertypes.ReadyStatus - jobConfig.TriggerStatus = TriggerReadyStatus -} +// getEvalResult gets eval result from job conditions +func (lm *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) { + jobConditions := job.Status.Conditions + models := lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output") -// nextLLTask converts next task status -func nextLLTask(jobConfig *LLJobConfig) { - switch jobConfig.Phase { - case TrainPhase: - { - forwardSamplesLL(jobConfig) - initLLTaskStatus(jobConfig) - jobConfig.Phase = EvalPhase + var result []map[string][]float64 + var err error + for _, m := range models { + bytes, err := json.Marshal(m.Metrics) + if err != nil { + return nil, err } - case EvalPhase: - { - forwardSamplesLL(jobConfig) - initLLTaskStatus(jobConfig) - jobConfig.Phase = DeployPhase + data := make(map[string][]float64) + if err = json.Unmarshal(bytes, &data); err != nil { + return nil, err } - case DeployPhase: - { - backLLTaskStatus(jobConfig) + + result = append(result, data) + } + return result, err +} + +// getJobStageModel gets model from job conditions for eval/deploy +func (lm *Manager) getJobStageModel(job *Job, jobStage sednav1.LLJobStage) (models []Model) { + jobConditions := job.Status.Conditions + + switch jobStage { + case sednav1.LLJobEval: + models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobTrain, sednav1.LLJobStageCondCompleted, "output") + case sednav1.LLJobDeploy: + models = lm.getModelsFromJobConditions(jobConditions, sednav1.LLJobEval, sednav1.LLJobStageCondCompleted, "output") + } + + return models +} + +// forwardSamples deletes the samples information in the memory +func forwardSamples(jobConfig *JobConfig, jobStage sednav1.LLJobStage) { + switch jobStage { + case sednav1.LLJobTrain: + jobConfig.Lock.Lock() + jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0] + jobConfig.Lock.Unlock() + case sednav1.LLJobEval: + if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity { + jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:] } } } @@ -798,8 +846,8 @@ func nextLLTask(jobConfig *LLJobConfig) { func (lm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) - if job, ok := lm.LifelongLearningJobMap[name]; ok && job.Done != nil { - close(job.Done) + if job, ok := lm.LifelongLearningJobMap[name]; ok && job.JobConfig.Done != nil { + close(job.JobConfig.Done) } delete(lm.LifelongLearningJobMap, name) @@ -811,7 +859,79 @@ func (lm *Manager) Delete(message *clienttypes.Message) error { return nil } -// Start starts LifelongLearningJob manager +// updateJobFromDB updates job from db +func (lm *Manager) updateJobFromDB(job *Job) error { + var err error + + previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier) + if err != nil { + return err + } + + m := metav1.ObjectMeta{} + if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) { + return err + } + + rounds, ok := m.Annotations[AnnotationsRoundsKey] + if !ok { + return nil + } + + if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil { + return err + } + + numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey] + if !ok { + return nil + } + + if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil { + return err + } + + dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey] + if !ok { + return nil + } + + localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "") + + if !job.JobConfig.Storage.IsLocalStorage { + defer os.RemoveAll(localURL) + } + + if err != nil { + return err + } + + samples, err := dataset.GetSamples(dataFileOfEval) + if err != nil { + klog.Errorf("read file %s failed: %v", dataFileOfEval, err) + return err + } + + job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples) + + return nil +} + +// saveJobToDB saves job info to db +func (lm *Manager) saveJobToDB(job *Job) error { + ann := job.ObjectMeta.Annotations + if ann == nil { + ann = make(map[string]string) + } + + ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds) + ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers) + ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL + + return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec) +} + +// Start starts lifelong-learning-job manager func (lm *Manager) Start() error { go lm.monitorWorker() @@ -845,44 +965,9 @@ func (lm *Manager) monitorWorker() { Status: workerMessage.Status, Output: &wo, } - lm.Client.WriteMessage(msg, job.getHeader()) - - lm.handleWorkerMessage(job, workerMessage) - } -} - -// handleWorkerMessage handles message from worker -func (lm *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) { - jobPhase := job.JobConfig.Phase - workerKind := workerMessage.Kind - if jobPhase != workerKind { - klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier, - jobPhase, workerKind) - return - } - - var models []*Model - for _, result := range workerMessage.Results { - model := Model{ - Format: result["format"].(string), - URL: result["url"].(string)} - models = append(models, &model) - } - - model := &Model{} - if len(models) != 1 { - return - } - model = models[0] - - job.JobConfig.WorkerStatus = workerMessage.Status - - if job.JobConfig.WorkerStatus == workertypes.CompletedStatus { - switch job.JobConfig.Phase { - case TrainPhase: - job.JobConfig.TrainModel = model - case EvalPhase: - job.JobConfig.EvalResult = model + if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { + klog.Errorf("job(%s) failed to write message: %v", name, err) + continue } } } From 3f82140dac0da7a2d942bc13b91016c330660dc0 Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Tue, 4 Jan 2022 19:43:00 +0800 Subject: [PATCH 6/7] IL: Fix downstream bug after deleting dataset, the event of deleting job can be sent to LC. Signed-off-by: JimmyYang20 --- .../incrementallearning/downstream.go | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/globalmanager/controllers/incrementallearning/downstream.go b/pkg/globalmanager/controllers/incrementallearning/downstream.go index 84be05287..d323ff602 100644 --- a/pkg/globalmanager/controllers/incrementallearning/downstream.go +++ b/pkg/globalmanager/controllers/incrementallearning/downstream.go @@ -62,18 +62,15 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro // more details at https://github.com/kubernetes/kubernetes/issues/3030 job.Kind = KindName - jobConditions := job.Status.Conditions - if len(jobConditions) == 0 { - return nil - } - dataName := job.Spec.Dataset.Name + // LC has dataset object on this node that may call dataset node + var dsNodeName string ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{}) if err != nil { - return fmt.Errorf("dataset(%s/%s) not found", job.Namespace, dataName) + klog.Errorf("not found job(name=%s/%s)'s dataset, error: %v", job.Kind, job.Name, err) + } else { + dsNodeName = ds.Spec.NodeName } - // LC has dataset object on this node that may call dataset node - dsNodeName := ds.Spec.NodeName var trainNodeName string var evalNodeName string @@ -102,6 +99,15 @@ func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) erro return nil } + if dsNodeName == "" { + return nil + } + + jobConditions := job.Status.Conditions + if len(jobConditions) == 0 { + return nil + } + latestCondition := jobConditions[len(jobConditions)-1] currentType := latestCondition.Type jobStage := latestCondition.Stage From 850374e6efac3d3fa8083c7ebfc81f04a1be052b Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Tue, 4 Jan 2022 20:05:10 +0800 Subject: [PATCH 7/7] Support for modelarts adapter Signed-off-by: JimmyYang20 --- .../lifelonglearning/lifelonglearningjob.go | 97 ++++++++++++++----- .../lifelonglearning/lifelonglearningjob.go | 64 ++++++++++++ pkg/localcontroller/storage/minio.go | 17 ++++ pkg/localcontroller/storage/storage.go | 17 ++++ 4 files changed, 173 insertions(+), 22 deletions(-) diff --git a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go index b2ed1e1c8..c4e8c32ff 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go @@ -18,6 +18,7 @@ package lifelonglearning import ( "context" + "crypto/sha256" "encoding/json" "fmt" "k8s.io/apimachinery/pkg/types" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + lruexpirecache "k8s.io/apimachinery/pkg/util/cache" utilrand "k8s.io/apimachinery/pkg/util/rand" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -52,6 +54,8 @@ const ( KindName = "LifelongLearningJob" // Name is this controller name Name = "LifelongLearning" + // VirtualKubeletNode is virtual node + VirtualKubeletNode = "virtual-kubelet" ) // Kind contains the schema.GroupVersionKind for this controller type. @@ -82,6 +86,8 @@ type Controller struct { cfg *config.ControllerConfig sendToEdgeFunc runtime.DownstreamSendFunc + + lruExpireCache *lruexpirecache.LRUExpireCache } // Run starts the main goroutine responsible for watching and syncing jobs. @@ -379,14 +385,17 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er // include train, eval, deploy pod var err error if jobStage == sednav1.LLJobDeploy { - err = c.restartInferPod(job) - if err != nil { - klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) - return needUpdated, err - } + if !c.hasJobInCache(job) { + err = c.restartInferPod(job) + if err != nil { + klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) + return needUpdated, err + } - klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) - newConditionType = sednav1.LLJobStageCondCompleted + klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) + newConditionType = sednav1.LLJobStageCondCompleted + c.addJobToCache(job) + } } else { if podStatus != v1.PodPending && podStatus != v1.PodRunning { err = c.createPod(job, jobStage) @@ -406,10 +415,6 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er // watch pod status, if pod running, set type running newConditionType = sednav1.LLJobStageCondRunning - } else if podStatus == v1.PodSucceeded { - // watch pod status, if pod completed, set type completed - newConditionType = sednav1.LLJobStageCondCompleted - klog.V(2).Infof("lifelonglearning job %v/%v %v stage completed!", job.Namespace, job.Name, jobStage) } else if podStatus == v1.PodFailed { newConditionType = sednav1.LLJobStageCondFailed klog.V(2).Infof("lifelonglearning job %v/%v %v stage failed!", job.Namespace, job.Name, jobStage) @@ -491,6 +496,21 @@ func (c *Controller) getSpecifiedPods(job *sednav1.LifelongLearningJob, podType return latestPod } +func (c *Controller) getHas256(target interface{}) string { + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%v", target))) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func (c *Controller) addJobToCache(job *sednav1.LifelongLearningJob) { + c.lruExpireCache.Add(c.getHas256(job.Status), job, 10*time.Second) +} + +func (c *Controller) hasJobInCache(job *sednav1.LifelongLearningJob) bool { + _, ok := c.lruExpireCache.Get(c.getHas256(job.Status)) + return ok +} + func (c *Controller) restartInferPod(job *sednav1.LifelongLearningJob) error { inferPod := c.getSpecifiedPods(job, runtime.InferencePodType) if inferPod == nil { @@ -542,6 +562,18 @@ func IsJobFinished(j *sednav1.LifelongLearningJob) bool { return false } +func (c *Controller) addPodAnnotations(spec *v1.PodTemplateSpec, key string, value string) { + ann := spec.GetAnnotations() + if ann == nil { + ann = make(map[string]string) + } + + if _, ok := ann[key]; !ok { + ann[key] = value + spec.SetAnnotations(ann) + } +} + func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1.LLJobStage) (err error) { ctx := context.Background() var podTemplate *v1.PodTemplateSpec @@ -592,12 +624,20 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 } var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) + if podtype == sednav1.LLJobTrain { - workerParam.WorkerType = "Train" + workerParam.WorkerType = runtime.TrainPodType podTemplate = &job.Spec.TrainSpec.Template // Env parameters for train + c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType) + c.addPodAnnotations(podTemplate, "data", dataURL) + datasetUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + datasetUseInitializer = false + } + workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -621,7 +661,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ URL: dataURL, Secret: jobSecret, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, EnvName: "TRAIN_DATASET_URL", }, @@ -632,14 +672,25 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 Secret: datasetSecret, URL: originalDataURLOrIndex, Indirect: dataset.Spec.URL != originalDataURLOrIndex, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, EnvName: "ORIGINAL_DATASET_URL", }, ) } else { podTemplate = &job.Spec.EvalSpec.Template - workerParam.WorkerType = "Eval" + workerParam.WorkerType = runtime.EvalPodType + + c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType) + c.addPodAnnotations(podTemplate, "data", dataURL) + datasetUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + datasetUseInitializer = false + } + modelUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + modelUseInitializer = false + } // Configure Env information for eval by initial WorkerParam workerParam.Env = map[string]string{ @@ -656,7 +707,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 modelMountURLs = append(modelMountURLs, runtime.MountURL{ URL: url, Secret: jobSecret, - DownloadByInitializer: true, + DownloadByInitializer: modelUseInitializer, }) } workerParam.Mounts = append(workerParam.Mounts, @@ -679,7 +730,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ URL: dataURL, Secret: datasetSecret, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, Name: "datasets", EnvName: "TEST_DATASET_URL", @@ -689,7 +740,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ Secret: datasetSecret, URL: originalDataURLOrIndex, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, Indirect: dataset.Spec.URL != originalDataURLOrIndex, }, Name: "origin-dataset", @@ -744,6 +795,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error { } workerParam.WorkerType = runtime.InferencePodType + c.addPodAnnotations(&job.Spec.DeploySpec.Template, "type", workerParam.WorkerType) workerParam.HostNetwork = true // create edge pod @@ -764,10 +816,11 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")}) jc := &Controller{ - kubeClient: cc.KubeClient, - client: cc.SednaClient.SednaV1alpha1(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), - cfg: cfg, + kubeClient: cc.KubeClient, + client: cc.SednaClient.SednaV1alpha1(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), + cfg: cfg, + lruExpireCache: lruexpirecache.NewLRUExpireCache(10), } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go index 05337b2c3..a5763fc6d 100644 --- a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go @@ -20,8 +20,10 @@ import ( "bufio" "encoding/json" "fmt" + "io/ioutil" "os" "path" + "path/filepath" "strconv" "strings" "sync" @@ -61,6 +63,9 @@ const ( AnnotationsRoundsKey = "sedna.io/rounds" AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples" AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval" + + // WorkerS3StatusHandlerIntervalSeconds is interval time of handling s3 status of worker + WorkerS3StatusHandlerIntervalSeconds = 30 ) // LifelongLearningJobManager defines lifelong-learning-job Manager @@ -257,6 +262,8 @@ func (lm *Manager) trainTask(job *Job) error { // continue anyway } + go lm.monitorS3Worker(job, sednav1.LLJobTrain) + jobConfig.TrainTriggerStatus = TriggerCompletedStatus klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", jobConfig.UniqueIdentifier, jobStage) @@ -297,6 +304,8 @@ func (lm *Manager) evalTask(job *Job) error { forwardSamples(jobConfig, jobStage) + go lm.monitorS3Worker(job, sednav1.LLJobEval) + jobConfig.EvalTriggerStatus = TriggerCompletedStatus klog.Infof("job(%s) completed the %sing phase triggering task successfully", jobConfig.UniqueIdentifier, jobStage) @@ -968,7 +977,62 @@ func (lm *Manager) monitorWorker() { if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { klog.Errorf("job(%s) failed to write message: %v", name, err) continue + } else { + klog.Infof("job(%s) write message(%v) to GM", name, msg) + } + } +} + +func (lm *Manager) monitorS3Worker(job *Job, stage sednav1.LLJobStage) { + jobConfig := job.JobConfig + var statusFile string + switch stage { + case sednav1.LLJobTrain: + statusFile = strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/") + case sednav1.LLJobEval: + statusFile = strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/") + } + + tempLocalFile := filepath.Join(os.TempDir(), "status.json") + for { + time.Sleep(WorkerS3StatusHandlerIntervalSeconds * time.Second) + localFile, err := jobConfig.Storage.Download(statusFile, tempLocalFile) + if err != nil { + continue } + + bytes, _ := ioutil.ReadFile(localFile) + workerMessage := workertypes.MessageContent{} + err = json.Unmarshal(bytes, &workerMessage) + if err != nil { + continue + } + + wo := clienttypes.Output{} + wo.Models = workerMessage.Results + wo.OwnerInfo = workerMessage.OwnerInfo + + msg := &clienttypes.UpstreamMessage{ + Phase: workerMessage.Kind, + Status: workerMessage.Status, + Output: &wo, + } + + name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind) + if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { + klog.Errorf("job(%s) failed to write message: %v", name, err) + continue + } + + if err = jobConfig.Storage.DeleteFile(statusFile); err != nil { + continue + } + + if err = jobConfig.Storage.DeleteFile(tempLocalFile); err != nil { + continue + } + + break } } diff --git a/pkg/localcontroller/storage/minio.go b/pkg/localcontroller/storage/minio.go index a4453ff48..0b5d672b2 100644 --- a/pkg/localcontroller/storage/minio.go +++ b/pkg/localcontroller/storage/minio.go @@ -145,3 +145,20 @@ func (mc *MinioClient) parseURL(URL string) (string, string, error) { return "", "", fmt.Errorf("invalid url(%s)", URL) } + +// deleteFile deletes file +func (mc *MinioClient) deleteFile(objectURL string) error { + bucket, absPath, err := mc.parseURL(objectURL) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut) + defer cancel() + + if err = mc.Client.RemoveObject(ctx, bucket, absPath, minio.RemoveObjectOptions{}); err != nil { + return fmt.Errorf("delete file(url=%s) failed, error: %+v", objectURL, err) + } + + return nil +} diff --git a/pkg/localcontroller/storage/storage.go b/pkg/localcontroller/storage/storage.go index 8855fc95f..25bb45e9a 100644 --- a/pkg/localcontroller/storage/storage.go +++ b/pkg/localcontroller/storage/storage.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "net/url" + "os" "path" "path/filepath" @@ -232,3 +233,19 @@ func (s *Storage) CopyFile(srcURL string, objectURL string) error { return nil } + +// DeleteFile deletes file +func (s *Storage) DeleteFile(objectURL string) error { + prefix, err := s.CheckURL(objectURL) + if err != nil { + return err + } + switch prefix { + case S3Prefix: + return s.MinioClient.deleteFile(objectURL) + case LocalPrefix: + return os.Remove(objectURL) + default: + return fmt.Errorf("invalid url(%s)", objectURL) + } +}