From 48682c5a059c155b51c69b0804e946a620e7054a Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Wed, 22 Dec 2021 09:41:21 +0800 Subject: [PATCH] support for modelarts adapter Signed-off-by: JimmyYang20 --- .../lifelonglearning/lifelonglearningjob.go | 101 ++++++++++++++---- .../lifelonglearning/lifelonglearningjob.go | 64 +++++++++++ pkg/localcontroller/storage/minio.go | 17 +++ pkg/localcontroller/storage/storage.go | 17 +++ 4 files changed, 177 insertions(+), 22 deletions(-) diff --git a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go index b2ed1e1c8..894ae5b5d 100644 --- a/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go @@ -18,6 +18,7 @@ package lifelonglearning import ( "context" + "crypto/sha256" "encoding/json" "fmt" "k8s.io/apimachinery/pkg/types" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + lruexpirecache "k8s.io/apimachinery/pkg/util/cache" utilrand "k8s.io/apimachinery/pkg/util/rand" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -52,6 +54,8 @@ const ( KindName = "LifelongLearningJob" // Name is this controller name Name = "LifelongLearning" + // VirtualKubeletNode is virtual node + VirtualKubeletNode = "virtual-kubelet" ) // Kind contains the schema.GroupVersionKind for this controller type. @@ -82,6 +86,8 @@ type Controller struct { cfg *config.ControllerConfig sendToEdgeFunc runtime.DownstreamSendFunc + + lruExpireCache *lruexpirecache.LRUExpireCache } // Run starts the main goroutine responsible for watching and syncing jobs. @@ -379,14 +385,17 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er // include train, eval, deploy pod var err error if jobStage == sednav1.LLJobDeploy { - err = c.restartInferPod(job) - if err != nil { - klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) - return needUpdated, err - } + if !c.hasJobInCache(job) { + err = c.restartInferPod(job) + if err != nil { + klog.V(2).Infof("lifelonglearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err) + return needUpdated, err + } - klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) - newConditionType = sednav1.LLJobStageCondCompleted + klog.V(2).Infof("lifelonglearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name) + newConditionType = sednav1.LLJobStageCondCompleted + c.addJobToCache(job) + } } else { if podStatus != v1.PodPending && podStatus != v1.PodRunning { err = c.createPod(job, jobStage) @@ -406,10 +415,6 @@ func (c *Controller) transitJobState(job *sednav1.LifelongLearningJob) (bool, er // watch pod status, if pod running, set type running newConditionType = sednav1.LLJobStageCondRunning - } else if podStatus == v1.PodSucceeded { - // watch pod status, if pod completed, set type completed - newConditionType = sednav1.LLJobStageCondCompleted - klog.V(2).Infof("lifelonglearning job %v/%v %v stage completed!", job.Namespace, job.Name, jobStage) } else if podStatus == v1.PodFailed { newConditionType = sednav1.LLJobStageCondFailed klog.V(2).Infof("lifelonglearning job %v/%v %v stage failed!", job.Namespace, job.Name, jobStage) @@ -491,6 +496,25 @@ func (c *Controller) getSpecifiedPods(job *sednav1.LifelongLearningJob, podType return latestPod } +func (c *Controller) getHas256(target interface{}) string { + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%v", target))) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func (c *Controller) addJobToCache(job *sednav1.LifelongLearningJob) { + c.lruExpireCache.Add(c.getHas256(job.Status), job, 10*time.Second) +} + +func (c *Controller) hasJobInCache(job *sednav1.LifelongLearningJob) bool { + _, ok := c.lruExpireCache.Get(c.getHas256(job.Status)) + if !ok { + return false + } + + return true +} + func (c *Controller) restartInferPod(job *sednav1.LifelongLearningJob) error { inferPod := c.getSpecifiedPods(job, runtime.InferencePodType) if inferPod == nil { @@ -542,6 +566,18 @@ func IsJobFinished(j *sednav1.LifelongLearningJob) bool { return false } +func (c *Controller) addPodAnnotations(spec *v1.PodTemplateSpec, key string, value string) { + ann := spec.GetAnnotations() + if ann == nil { + ann = make(map[string]string) + } + + if _, ok := ann[key]; !ok { + ann[key] = value + spec.SetAnnotations(ann) + } +} + func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1.LLJobStage) (err error) { ctx := context.Background() var podTemplate *v1.PodTemplateSpec @@ -592,12 +628,20 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 } var workerParam *runtime.WorkerParam = new(runtime.WorkerParam) + if podtype == sednav1.LLJobTrain { - workerParam.WorkerType = "Train" + workerParam.WorkerType = runtime.TrainPodType podTemplate = &job.Spec.TrainSpec.Template // Env parameters for train + c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType) + c.addPodAnnotations(podTemplate, "data", dataURL) + datasetUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + datasetUseInitializer = false + } + workerParam.Env = map[string]string{ "NAMESPACE": job.Namespace, "JOB_NAME": job.Name, @@ -621,7 +665,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ URL: dataURL, Secret: jobSecret, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, EnvName: "TRAIN_DATASET_URL", }, @@ -632,14 +676,25 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 Secret: datasetSecret, URL: originalDataURLOrIndex, Indirect: dataset.Spec.URL != originalDataURLOrIndex, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, EnvName: "ORIGINAL_DATASET_URL", }, ) } else { podTemplate = &job.Spec.EvalSpec.Template - workerParam.WorkerType = "Eval" + workerParam.WorkerType = runtime.EvalPodType + + c.addPodAnnotations(podTemplate, "type", workerParam.WorkerType) + c.addPodAnnotations(podTemplate, "data", dataURL) + datasetUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + datasetUseInitializer = false + } + modelUseInitializer := true + if podTemplate.Spec.NodeName == VirtualKubeletNode { + modelUseInitializer = false + } // Configure Env information for eval by initial WorkerParam workerParam.Env = map[string]string{ @@ -656,7 +711,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 modelMountURLs = append(modelMountURLs, runtime.MountURL{ URL: url, Secret: jobSecret, - DownloadByInitializer: true, + DownloadByInitializer: modelUseInitializer, }) } workerParam.Mounts = append(workerParam.Mounts, @@ -679,7 +734,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ URL: dataURL, Secret: datasetSecret, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, }, Name: "datasets", EnvName: "TEST_DATASET_URL", @@ -689,7 +744,7 @@ func (c *Controller) createPod(job *sednav1.LifelongLearningJob, podtype sednav1 URL: &runtime.MountURL{ Secret: datasetSecret, URL: originalDataURLOrIndex, - DownloadByInitializer: true, + DownloadByInitializer: datasetUseInitializer, Indirect: dataset.Spec.URL != originalDataURLOrIndex, }, Name: "origin-dataset", @@ -744,6 +799,7 @@ func (c *Controller) createInferPod(job *sednav1.LifelongLearningJob) error { } workerParam.WorkerType = runtime.InferencePodType + c.addPodAnnotations(&job.Spec.DeploySpec.Template, "type", workerParam.WorkerType) workerParam.HostNetwork = true // create edge pod @@ -764,10 +820,11 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")}) jc := &Controller{ - kubeClient: cc.KubeClient, - client: cc.SednaClient.SednaV1alpha1(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), - cfg: cfg, + kubeClient: cc.KubeClient, + client: cc.SednaClient.SednaV1alpha1(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name), + cfg: cfg, + lruExpireCache: lruexpirecache.NewLRUExpireCache(10), } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go index 05337b2c3..a5763fc6d 100644 --- a/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go +++ b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go @@ -20,8 +20,10 @@ import ( "bufio" "encoding/json" "fmt" + "io/ioutil" "os" "path" + "path/filepath" "strconv" "strings" "sync" @@ -61,6 +63,9 @@ const ( AnnotationsRoundsKey = "sedna.io/rounds" AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples" AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval" + + // WorkerS3StatusHandlerIntervalSeconds is interval time of handling s3 status of worker + WorkerS3StatusHandlerIntervalSeconds = 30 ) // LifelongLearningJobManager defines lifelong-learning-job Manager @@ -257,6 +262,8 @@ func (lm *Manager) trainTask(job *Job) error { // continue anyway } + go lm.monitorS3Worker(job, sednav1.LLJobTrain) + jobConfig.TrainTriggerStatus = TriggerCompletedStatus klog.Infof("job(name=%s) complete the %sing phase triggering task successfully", jobConfig.UniqueIdentifier, jobStage) @@ -297,6 +304,8 @@ func (lm *Manager) evalTask(job *Job) error { forwardSamples(jobConfig, jobStage) + go lm.monitorS3Worker(job, sednav1.LLJobEval) + jobConfig.EvalTriggerStatus = TriggerCompletedStatus klog.Infof("job(%s) completed the %sing phase triggering task successfully", jobConfig.UniqueIdentifier, jobStage) @@ -968,7 +977,62 @@ func (lm *Manager) monitorWorker() { if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { klog.Errorf("job(%s) failed to write message: %v", name, err) continue + } else { + klog.Infof("job(%s) write message(%v) to GM", name, msg) + } + } +} + +func (lm *Manager) monitorS3Worker(job *Job, stage sednav1.LLJobStage) { + jobConfig := job.JobConfig + var statusFile string + switch stage { + case sednav1.LLJobTrain: + statusFile = strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/") + case sednav1.LLJobEval: + statusFile = strings.Join([]string{jobConfig.OutputConfig.EvalOutput, strconv.Itoa(jobConfig.Rounds), "status.json"}, "/") + } + + tempLocalFile := filepath.Join(os.TempDir(), "status.json") + for { + time.Sleep(WorkerS3StatusHandlerIntervalSeconds * time.Second) + localFile, err := jobConfig.Storage.Download(statusFile, tempLocalFile) + if err != nil { + continue } + + bytes, _ := ioutil.ReadFile(localFile) + workerMessage := workertypes.MessageContent{} + err = json.Unmarshal(bytes, &workerMessage) + if err != nil { + continue + } + + wo := clienttypes.Output{} + wo.Models = workerMessage.Results + wo.OwnerInfo = workerMessage.OwnerInfo + + msg := &clienttypes.UpstreamMessage{ + Phase: workerMessage.Kind, + Status: workerMessage.Status, + Output: &wo, + } + + name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind) + if err := lm.Client.WriteMessage(msg, job.getHeader()); err != nil { + klog.Errorf("job(%s) failed to write message: %v", name, err) + continue + } + + if err = jobConfig.Storage.DeleteFile(statusFile); err != nil { + continue + } + + if err = jobConfig.Storage.DeleteFile(tempLocalFile); err != nil { + continue + } + + break } } diff --git a/pkg/localcontroller/storage/minio.go b/pkg/localcontroller/storage/minio.go index a4453ff48..0b5d672b2 100644 --- a/pkg/localcontroller/storage/minio.go +++ b/pkg/localcontroller/storage/minio.go @@ -145,3 +145,20 @@ func (mc *MinioClient) parseURL(URL string) (string, string, error) { return "", "", fmt.Errorf("invalid url(%s)", URL) } + +// deleteFile deletes file +func (mc *MinioClient) deleteFile(objectURL string) error { + bucket, absPath, err := mc.parseURL(objectURL) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), MaxTimeOut) + defer cancel() + + if err = mc.Client.RemoveObject(ctx, bucket, absPath, minio.RemoveObjectOptions{}); err != nil { + return fmt.Errorf("delete file(url=%s) failed, error: %+v", objectURL, err) + } + + return nil +} diff --git a/pkg/localcontroller/storage/storage.go b/pkg/localcontroller/storage/storage.go index 8855fc95f..25bb45e9a 100644 --- a/pkg/localcontroller/storage/storage.go +++ b/pkg/localcontroller/storage/storage.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "net/url" + "os" "path" "path/filepath" @@ -232,3 +233,19 @@ func (s *Storage) CopyFile(srcURL string, objectURL string) error { return nil } + +// DeleteFile deletes file +func (s *Storage) DeleteFile(objectURL string) error { + prefix, err := s.CheckURL(objectURL) + if err != nil { + return err + } + switch prefix { + case S3Prefix: + return s.MinioClient.deleteFile(objectURL) + case LocalPrefix: + return os.Remove(objectURL) + default: + return fmt.Errorf("invalid url(%s)", objectURL) + } +}