Skip to content

Commit

Permalink
support for modelarts adapter
Browse files Browse the repository at this point in the history
Signed-off-by: JimmyYang20 <yangjin39@huawei.com>
  • Loading branch information
JimmyYang20 committed Jan 4, 2022
1 parent 78220b9 commit 48682c5
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 22 deletions.
101 changes: 79 additions & 22 deletions pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package lifelonglearning

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -27,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
lruexpirecache "k8s.io/apimachinery/pkg/util/cache"
utilrand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -52,6 +54,8 @@ const (
KindName = "LifelongLearningJob"
// Name is this controller name
Name = "LifelongLearning"
// VirtualKubeletNode is virtual node
VirtualKubeletNode = "virtual-kubelet"
)

// Kind contains the schema.GroupVersionKind for this controller type.
Expand Down Expand Up @@ -82,6 +86,8 @@ type Controller struct {
cfg *config.ControllerConfig

sendToEdgeFunc runtime.DownstreamSendFunc

lruExpireCache *lruexpirecache.LRUExpireCache
}

// Run starts the main goroutine responsible for watching and syncing jobs.
Expand Down Expand Up @@ -379,14 +385,17 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er
// include train, eval, deploy pod
var err error
if jobStage == sednav1.LLJobDeploy {
err = c.restartInferPod(job)
if err != nil {
klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err)
return needUpdated, err
}
if !c.hasJobInCache(job) {
err = c.restartInferPod(job)
if err != nil {
klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err)
return needUpdated, err
}

klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name)
newConditionType = sednav1.LLJobStageCondCompleted
klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name)
newConditionType = sednav1.LLJobStageCondCompleted
c.addJobToCache(job)
}
} else {
if podStatus != v1.PodPending && podStatus != v1.PodRunning {
err = c.createPod(job, jobStage)
Expand All @@ -406,10 +415,6 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er

// watch pod status, if pod running, set type running
newConditionType = sednav1.LLJobStageCondRunning
} else if podStatus == v1.PodSucceeded {
// watch pod status, if pod completed, set type completed
newConditionType = sednav1.LLJobStageCondCompleted
klog.V(2).Infof("lifelonglearning job %v/%v %v stage completed!", job.Namespace, job.Name, jobStage)
} else if podStatus == v1.PodFailed {
newConditionType = sednav1.LLJobStageCondFailed
klog.V(2).Infof("lifelonglearning job %v/%v %v stage failed!", job.Namespace, job.Name, jobStage)
Expand Down Expand Up @@ -491,6 +496,25 @@ func (c *Controller) getSpecifiedPods(job *sednav1.LifelongLearningJob, podType
return latestPod
}

func (c *Controller) getHas256(target interface{}) string {
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%v", target)))
return fmt.Sprintf("%x", h.Sum(nil))
}

func (c *Controller) addJobToCache(job *sednav1.LifelongLearningJob) {
c.lruExpireCache.Add(c.getHas256(job.Status), job, 10*time.Second)
}

func (c *Controller) hasJobInCache(job *sednav1.LifelongLearningJob) bool {
_, ok := c.lruExpireCache.Get(c.getHas256(job.Status))
if !ok {
return false
}

return true
}

func (c *Controller) restartInferPod(job *sednav1.LifelongLearningJob) error {
inferPod := c.getSpecifiedPods(job, runtime.InferencePodType)
if inferPod == nil {
Expand Down Expand Up @@ -542,6 +566,18 @@ func IsJobFinished(j *sednav1.LifelongLearningJob) bool {
return false
}

func (c *Controller) addPodAnnotations(spec *v1.PodTemplateSpec, key string, value string) {
ann := spec.GetAnnotations()
if ann == nil {
ann = make(map[string]string)
}

if _, ok := ann[key]; !ok {
ann[key] = value
spec.SetAnnotations(ann)
}
}

func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1.LLJobStage) (err error) {
ctx := context.Background()
var podTemplate *v1.PodTemplateSpec
Expand Down Expand Up @@ -592,12 +628,20 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1
}

var workerParam *runtime.WorkerParam = new(runtime.WorkerParam)

if podtype == sednav1.LLJobTrain {
workerParam.WorkerType = "Train"
workerParam.WorkerType = runtime.TrainPodType

podTemplate = &job.Spec.TrainSpec.Template
// Env parameters for train

c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType)
c.addPodAnnotations(podTemplate, "data", dataURL)
datasetUseInitializer := true
if podTemplate.Spec.NodeName == VirtualKubeletNode {
datasetUseInitializer = false
}

workerParam.Env = map[string]string{
"NAMESPACE": job.Namespace,
"JOB_NAME": job.Name,
Expand All @@ -621,7 +665,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1
URL: &runtime.MountURL{
URL: dataURL,
Secret: jobSecret,
DownloadByInitializer: true,
DownloadByInitializer: datasetUseInitializer,
},
EnvName: "TRAIN_DATASET_URL",
},
Expand All @@ -632,14 +676,25 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1
Secret: datasetSecret,
URL: originalDataURLOrIndex,
Indirect: dataset.Spec.URL != originalDataURLOrIndex,
DownloadByInitializer: true,
DownloadByInitializer: datasetUseInitializer,
},
EnvName: "ORIGINAL_DATASET_URL",
},
)
} else {
podTemplate = &job.Spec.EvalSpec.Template
workerParam.WorkerType = "Eval"
workerParam.WorkerType = runtime.EvalPodType

c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType)
c.addPodAnnotations(podTemplate, "data", dataURL)
datasetUseInitializer := true
if podTemplate.Spec.NodeName == VirtualKubeletNode {
datasetUseInitializer = false
}
modelUseInitializer := true
if podTemplate.Spec.NodeName == VirtualKubeletNode {
modelUseInitializer = false
}

// Configure Env information for eval by initial WorkerParam
workerParam.Env = map[string]string{
Expand All @@ -656,7 +711,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1
modelMountURLs = append(modelMountURLs, runtime.MountURL{
URL: url,
Secret: jobSecret,
DownloadByInitializer: true,
DownloadByInitializer: modelUseInitializer,
})
}
workerParam.Mounts = append(workerParam.Mounts,
Expand All @@ -679,7 +734,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1
URL: &runtime.MountURL{
URL: dataURL,
Secret: datasetSecret,
DownloadByInitializer: true,
DownloadByInitializer: datasetUseInitializer,
},
Name: "datasets",
EnvName: "TEST_DATASET_URL",
Expand All @@ -689,7 +744,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1
URL: &runtime.MountURL{
Secret: datasetSecret,
URL: originalDataURLOrIndex,
DownloadByInitializer: true,
DownloadByInitializer: datasetUseInitializer,
Indirect: dataset.Spec.URL != originalDataURLOrIndex,
},
Name: "origin-dataset",
Expand Down Expand Up @@ -744,6 +799,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error {
}

workerParam.WorkerType = runtime.InferencePodType
c.addPodAnnotations(&job.Spec.DeploySpec.Template, "type", workerParam.WorkerType)
workerParam.HostNetwork = true

// create edge pod
Expand All @@ -764,10 +820,11 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")})

jc := &Controller{
kubeClient: cc.KubeClient,
client: cc.SednaClient.SednaV1alpha1(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name),
cfg: cfg,
kubeClient: cc.KubeClient,
client: cc.SednaClient.SednaV1alpha1(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name),
cfg: cfg,
lruExpireCache: lruexpirecache.NewLRUExpireCache(10),
}

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"bufio"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -61,6 +63,9 @@ const (
AnnotationsRoundsKey = "sedna.io/rounds"
AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples"
AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval"

// WorkerS3StatusHandlerIntervalSeconds is interval time of handling s3 status of worker
WorkerS3StatusHandlerIntervalSeconds = 30
)

// LifelongLearningJobManager defines lifelong-learning-job Manager
Expand Down Expand Up @@ -257,6 +262,8 @@ func (lm *Manager) trainTask(job *Job) error {
// continue anyway
}

go lm.monitorS3Worker(job, sednav1.LLJobTrain)

jobConfig.TrainTriggerStatus = TriggerCompletedStatus
klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobStage)
Expand Down Expand Up @@ -297,6 +304,8 @@ func (lm *Manager) evalTask(job *Job) error {

forwardSamples(jobConfig, jobStage)

go lm.monitorS3Worker(job, sednav1.LLJobEval)

jobConfig.EvalTriggerStatus = TriggerCompletedStatus
klog.Infof("job(%s) completed the %sing phase triggering task successfully",
jobConfig.UniqueIdentifier, jobStage)
Expand Down Expand Up @@ -968,7 +977,62 @@ func (lm *Manager) monitorWorker() {
if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil {
klog.Errorf("job(%s) failed to write message: %v", name, err)
continue
} else {
klog.Infof("job(%s) write message(%v) to GM", name, msg)
}
}
}

func (lm *Manager) monitorS3Worker(job *Job, stage sednav1.LLJobStage) {
jobConfig := job.JobConfig
var statusFile string
switch stage {
case sednav1.LLJobTrain:
statusFile = strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/")
case sednav1.LLJobEval:
statusFile = strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/")
}

tempLocalFile := filepath.Join(os.TempDir(), "status.json")
for {
time.Sleep(WorkerS3StatusHandlerIntervalSeconds * time.Second)
localFile, err := jobConfig.Storage.Download(statusFile, tempLocalFile)
if err != nil {
continue
}

bytes, _ := ioutil.ReadFile(localFile)
workerMessage := workertypes.MessageContent{}
err = json.Unmarshal(bytes, &workerMessage)
if err != nil {
continue
}

wo := clienttypes.Output{}
wo.Models = workerMessage.Results
wo.OwnerInfo = workerMessage.OwnerInfo

msg := &clienttypes.UpstreamMessage{
Phase: workerMessage.Kind,
Status: workerMessage.Status,
Output: &wo,
}

name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil {
klog.Errorf("job(%s) failed to write message: %v", name, err)
continue
}

if err = jobConfig.Storage.DeleteFile(statusFile); err != nil {
continue
}

if err = jobConfig.Storage.DeleteFile(tempLocalFile); err != nil {
continue
}

break
}
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/localcontroller/storage/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,20 @@ func (mc *MinioClient) parseURL(URL string) (string, string, error) {

return "", "", fmt.Errorf("invalid url(%s)", URL)
}

// deleteFile deletes file
func (mc *MinioClient) deleteFile(objectURL string) error {
bucket, absPath, err := mc.parseURL(objectURL)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut)
defer cancel()

if err = mc.Client.RemoveObject(ctx, bucket, absPath, minio.RemoveObjectOptions{}); err != nil {
return fmt.Errorf("delete file(url=%s) failed, error: %+v", objectURL, err)
}

return nil
}
17 changes: 17 additions & 0 deletions pkg/localcontroller/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"net/url"
"os"
"path"
"path/filepath"

Expand Down Expand Up @@ -232,3 +233,19 @@ func (s *Storage) CopyFile(srcURL string, objectURL string) error {

return nil
}

// DeleteFile deletes file
func (s *Storage) DeleteFile(objectURL string) error {
prefix, err := s.CheckURL(objectURL)
if err != nil {
return err
}
switch prefix {
case S3Prefix:
return s.MinioClient.deleteFile(objectURL)
case LocalPrefix:
return os.Remove(objectURL)
default:
return fmt.Errorf("invalid url(%s)", objectURL)
}
}

0 comments on commit 48682c5

Please sign in to comment.