From 44609f0090816f40ef64299dbcae428bd2b1694c Mon Sep 17 00:00:00 2001 From: JimmyYang20 Date: Wed, 4 Aug 2021 09:36:58 +0800 Subject: [PATCH] lc: decouple all features into independent package Signed-off-by: JimmyYang20 --- cmd/sedna-lc/app/server.go | 22 +- pkg/localcontroller/gmclient/types.go | 37 +++- pkg/localcontroller/manager/types.go | 120 ----------- .../{manager => managers/dataset}/dataset.go | 83 +++---- .../federatedlearningjob.go | 46 ++-- .../incrementallearningjob.go | 203 ++++++++++-------- .../jointinference}/jointinferenceservice.go | 46 ++-- .../lifelonglearning}/lifelonglearningjob.go | 180 +++++++++------- .../{manager => managers/model}/model.go | 37 ++-- pkg/localcontroller/managers/types.go | 39 ++++ pkg/localcontroller/server/server.go | 13 +- pkg/localcontroller/storage/storage.go | 17 +- pkg/localcontroller/worker/worker.go | 48 +++++ 13 files changed, 473 insertions(+), 418 deletions(-) delete mode 100644 pkg/localcontroller/manager/types.go rename pkg/localcontroller/{manager => managers/dataset}/dataset.go (77%) rename pkg/localcontroller/{manager => managers/federatedlearning}/federatedlearningjob.go (70%) rename pkg/localcontroller/{manager => managers/incrementallearning}/incrementallearningjob.go (83%) rename pkg/localcontroller/{manager => managers/jointinference}/jointinferenceservice.go (68%) rename pkg/localcontroller/{manager => managers/lifelonglearning}/lifelonglearningjob.go (82%) rename pkg/localcontroller/{manager => managers/model}/model.go (68%) create mode 100644 pkg/localcontroller/managers/types.go create mode 100644 pkg/localcontroller/worker/worker.go diff --git a/cmd/sedna-lc/app/server.go b/cmd/sedna-lc/app/server.go index e371509e9..52a56f57d 100644 --- a/cmd/sedna-lc/app/server.go +++ b/cmd/sedna-lc/app/server.go @@ -29,7 +29,13 @@ import ( "github.com/kubeedge/sedna/cmd/sedna-lc/app/options" "github.com/kubeedge/sedna/pkg/localcontroller/common/constants" "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" - "github.com/kubeedge/sedna/pkg/localcontroller/manager" + "github.com/kubeedge/sedna/pkg/localcontroller/managers" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/federatedlearning" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/incrementallearning" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/jointinference" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/lifelonglearning" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/model" "github.com/kubeedge/sedna/pkg/localcontroller/server" "github.com/kubeedge/sedna/pkg/version/verflag" ) @@ -86,21 +92,21 @@ func runServer() { return } - dm := manager.NewDatasetManager(c, Options) + dm := dataset.New(c, Options) - mm := manager.NewModelManager(c) + mm := model.New(c) - jm := manager.NewJointInferenceManager(c) + jm := jointinference.New(c) - fm := manager.NewFederatedLearningManager(c) + fm := federatedlearning.New(c) - im := manager.NewIncrementalJobManager(c, dm, mm, Options) + im := incrementallearning.New(c, dm, mm, Options) - lm := manager.NewLifelongLearningJobManager(c, dm, mm, Options) + lm := lifelonglearning.New(c, dm, Options) s := server.New(Options) - for _, m := range []manager.FeatureManager{ + for _, m := range []managers.FeatureManager{ dm, mm, jm, fm, im, lm, } { s.AddFeatureManager(m) diff --git a/pkg/localcontroller/gmclient/types.go b/pkg/localcontroller/gmclient/types.go index 205a39b9b..59e764db3 100644 --- a/pkg/localcontroller/gmclient/types.go +++ b/pkg/localcontroller/gmclient/types.go @@ -16,6 +16,8 @@ limitations under the License. package gmclient +import messagetypes "github.com/kubeedge/sedna/pkg/globalmanager/messagelayer/model" + const ( // InsertOperation is the insert value InsertOperation = "insert" @@ -25,18 +27,39 @@ const ( StatusOperation = "status" ) -// Message defines message +// Message defines message between LC and GM type Message struct { Header MessageHeader `json:"header"` Content []byte `json:"content"` } -// MessageHeader define header of message -type MessageHeader struct { - Namespace string `json:"namespace"` - ResourceKind string `json:"resourceKind"` - ResourceName string `json:"resourceName"` - Operation string `json:"operation"` +// MessageHeader defines the header between LC and GM +type MessageHeader = messagetypes.MessageHeader + +// UpstreamMessage defines send message content to GM +type UpstreamMessage struct { + Phase string `json:"phase"` + Status string `json:"status"` + Input *Input `json:"input,omitempty"` + Output *Output `json:"output"` +} + +type Model struct { + Format string `json:"format"` + URL string `json:"url"` + Metrics map[string]interface{} `json:"metrics,omitempty"` +} + +type Input struct { + Models []Model `json:"models,omitempty"` + DataURL string `json:"dataURL,omitempty"` + DataIndexURL string `json:"dataIndexURL,omitempty"` + OutputDir string `json:"outputDir,omitempty"` +} + +type Output struct { + Models []map[string]interface{} `json:"models"` + OwnerInfo map[string]interface{} `json:"ownerInfo"` } type MessageResourceHandler interface { diff --git a/pkg/localcontroller/manager/types.go b/pkg/localcontroller/manager/types.go deleted file mode 100644 index c0a53062d..000000000 --- a/pkg/localcontroller/manager/types.go +++ /dev/null @@ -1,120 +0,0 @@ -/* -Copyright 2021 The KubeEdge Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package manager - -import ( - "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" -) - -const ( - // WorkerMessageChannelCacheSize is size of channel cache - WorkerMessageChannelCacheSize = 100 - - // TrainPhase is the train phase in incremental-learning-job - TrainPhase = "train" - // EvalPhase is the eval phase in incremental-learning-job - EvalPhase = "eval" - // DeployPhase is the deploy phase in incremental-learning-job - DeployPhase = "deploy" - - // WorkerWaitingStatus is the waiting status about worker - WorkerWaitingStatus = "waiting" - - // WorkerReadyStatus is the ready status about worker - WorkerReadyStatus = "ready" - // WorkerCompletedStatus is the completed status about worker - WorkerCompletedStatus = "completed" - // WorkerFailedStatus is the failed status about worker - WorkerFailedStatus = "failed" - - // TriggerReadyStatus is the ready status about trigger in incremental-learning-job - TriggerReadyStatus = "ready" - // TriggerCompletedStatus is the completed status about trigger in incremental-learning-job - TriggerCompletedStatus = "completed" - - // CredentialAnnotationKey is credential of the storage service - CredentialAnnotationKey = "sedna.io/credential" - - // DatasetFormatCSV is csv format of dataset - DatasetFormatCSV = "csv" - // DatasetFormatTXT is txt format of dataset - DatasetFormatTXT = "txt" -) - -// WorkerMessage defines message struct from worker -type WorkerMessage struct { - Name string `json:"name"` - Namespace string `json:"namespace"` - OwnerName string `json:"ownerName"` - OwnerKind string `json:"ownerKind"` - Kind string `json:"kind"` - Status string `json:"status"` - OwnerInfo map[string]interface{} `json:"ownerInfo"` - Results []map[string]interface{} `json:"results"` -} - -// MetaData defines metadata -type MetaData struct { - Name string `json:"name"` - Namespace string `json:"namespace"` -} - -// ModelInfo defines model -type ModelInfo struct { - Format string `json:"format"` - URL string `json:"url"` - Metrics map[string][]float64 `json:"metrics,omitempty"` -} - -// UpstreamMessage defines send message to GlobalManager -type UpstreamMessage struct { - Phase string `json:"phase"` - Status string `json:"status"` - Input *WorkerInput `json:"input,omitempty"` - Output *WorkerOutput `json:"output"` -} - -type WorkerInput struct { - // Only one model cases - Models []ModelInfo `json:"models,omitempty"` - DataURL string `json:"dataURL,omitempty"` - DataIndexURL string `json:"dataIndexURL,omitempty"` - OutputDir string `json:"outputDir,omitempty"` -} - -// WorkerOutput defines output information of worker -type WorkerOutput struct { - Models []map[string]interface{} `json:"models"` - OwnerInfo map[string]interface{} `json:"ownerInfo"` -} - -// FeatureManager defines feature manager -type FeatureManager interface { - // Start starts the manager - Start() error - - // GetName returns name of the manager - GetName() string - - // AddWorkerMessage dispatch the worker message to manager - AddWorkerMessage(message WorkerMessage) - - // Insert includes gm message creation/updation - Insert(*gmclient.Message) error - - Delete(*gmclient.Message) error -} diff --git a/pkg/localcontroller/manager/dataset.go b/pkg/localcontroller/managers/dataset/dataset.go similarity index 77% rename from pkg/localcontroller/manager/dataset.go rename to pkg/localcontroller/managers/dataset/dataset.go index b578b697e..2d94b8adb 100644 --- a/pkg/localcontroller/manager/dataset.go +++ b/pkg/localcontroller/managers/dataset/dataset.go @@ -14,14 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package manager +package dataset import ( "bufio" "encoding/json" "fmt" "os" - "path" "path/filepath" "strings" "time" @@ -30,22 +29,30 @@ import ( "github.com/kubeedge/sedna/cmd/sedna-lc/app/options" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" + "github.com/kubeedge/sedna/pkg/globalmanager/runtime" "github.com/kubeedge/sedna/pkg/localcontroller/db" - "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" "github.com/kubeedge/sedna/pkg/localcontroller/storage" "github.com/kubeedge/sedna/pkg/localcontroller/util" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" ) const ( // MonitorDataSourceIntervalSeconds is interval time of monitoring data source MonitorDataSourceIntervalSeconds = 60 - // DatasetResourceKind is kind of dataset resource - DatasetResourceKind = "dataset" + // KindName is kind of dataset resource + KindName = "dataset" + // CSVFormat is commas separated value format with a extra header. + // It can be used in structured data scenarios. + CSVFormat = "csv" + // FormatTXT is line separated format. + // It can be used in unstructured data scenarios. + TXTFormat = "txt" ) // DatasetManager defines dataset manager -type DatasetManager struct { - Client gmclient.ClientI +type Manager struct { + Client clienttypes.ClientI DatasetMap map[string]*Dataset VolumeMountPrefix string } @@ -59,23 +66,16 @@ type Dataset struct { Storage storage.Storage } -// DatasetSpec defines dataset spec -type DatasetSpec struct { - Format string `json:"format"` - DataURL string `json:"url"` -} - // DataSource defines config for data source type DataSource struct { TrainSamples []string - ValidSamples []string NumberOfSamples int Header string } -// NewDatasetManager creates a dataset manager -func NewDatasetManager(client gmclient.ClientI, options *options.LocalControllerOptions) *DatasetManager { - dm := DatasetManager{ +// New creates a dataset manager +func New(client clienttypes.ClientI, options *options.LocalControllerOptions) *Manager { + dm := Manager{ Client: client, DatasetMap: make(map[string]*Dataset), VolumeMountPrefix: options.VolumeMountPrefix, @@ -85,18 +85,18 @@ func NewDatasetManager(client gmclient.ClientI, options *options.LocalController } // Start starts dataset manager -func (dm *DatasetManager) Start() error { +func (dm *Manager) Start() error { return nil } // GetDatasetChannel gets dataset -func (dm *DatasetManager) GetDataset(name string) (*Dataset, bool) { +func (dm *Manager) GetDataset(name string) (*Dataset, bool) { d, ok := dm.DatasetMap[name] return d, ok } // Insert inserts dataset to db -func (dm *DatasetManager) Insert(message *gmclient.Message) error { +func (dm *Manager) Insert(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) first := false dataset, ok := dm.DatasetMap[name] @@ -112,7 +112,7 @@ func (dm *DatasetManager) Insert(message *gmclient.Message) error { return err } - credential := dataset.ObjectMeta.Annotations[CredentialAnnotationKey] + credential := dataset.ObjectMeta.Annotations[runtime.SecretAnnotationKey] if credential != "" { if err := dataset.Storage.SetCredential(credential); err != nil { return fmt.Errorf("failed to set dataset(name=%s)'s storage credential, error: %+v", name, err) @@ -139,7 +139,7 @@ func (dm *DatasetManager) Insert(message *gmclient.Message) error { } // Delete deletes dataset config in db -func (dm *DatasetManager) Delete(message *gmclient.Message) error { +func (dm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) if ds, ok := dm.DatasetMap[name]; ok && ds.Done != nil { @@ -156,7 +156,7 @@ func (dm *DatasetManager) Delete(message *gmclient.Message) error { } // monitorDataSources monitors the data url of specified dataset -func (dm *DatasetManager) monitorDataSources(name string) { +func (dm *Manager) monitorDataSources(name string) { ds, ok := dm.DatasetMap[name] if !ok || ds == nil { return @@ -186,11 +186,11 @@ func (dm *DatasetManager) monitorDataSources(name string) { klog.Infof("dataset(name=%s) get samples from data source(url=%s) successfully. number of samples: %d", name, dataURL, dataSource.NumberOfSamples) - header := gmclient.MessageHeader{ + header := clienttypes.MessageHeader{ Namespace: ds.Namespace, ResourceKind: ds.Kind, ResourceName: ds.Name, - Operation: gmclient.StatusOperation, + Operation: clienttypes.StatusOperation, } if err := dm.Client.WriteMessage(struct { @@ -208,8 +208,8 @@ func (dm *DatasetManager) monitorDataSources(name string) { // getDataSource gets data source info func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, error) { - if path.Ext(dataURL) != ("." + format) { - return nil, fmt.Errorf("dataset file url(%s)'s suffix is different from format(%s)", dataURL, format) + if err := ds.validFormat(format); err != nil { + return nil, err } localURL, err := ds.Storage.Download(dataURL, "") @@ -227,7 +227,7 @@ func (ds *Dataset) getDataSource(dataURL string, format string) (*DataSource, er // readByLine reads file by line func (ds *Dataset) readByLine(url string, format string) (*DataSource, error) { - samples, err := getSamples(url) + samples, err := GetSamples(url) if err != nil { klog.Errorf("read file %s failed, error: %v", url, err) return nil, err @@ -235,10 +235,10 @@ func (ds *Dataset) readByLine(url string, format string) (*DataSource, error) { numberOfSamples := 0 dataSource := DataSource{} - switch format { - case DatasetFormatTXT: + switch strings.ToLower(format) { + case TXTFormat: numberOfSamples += len(samples) - case DatasetFormatCSV: + case CSVFormat: // the first row of csv file is header if len(samples) == 0 { return nil, fmt.Errorf("file %s is empty", url) @@ -257,16 +257,16 @@ func (ds *Dataset) readByLine(url string, format string) (*DataSource, error) { return &dataSource, nil } -func (dm *DatasetManager) GetName() string { - return DatasetResourceKind +func (dm *Manager) GetName() string { + return KindName } -func (dm *DatasetManager) AddWorkerMessage(message WorkerMessage) { +func (dm *Manager) AddWorkerMessage(message workertypes.MessageContent) { // dummy } -// getSamples gets samples in a file -func getSamples(url string) ([]string, error) { +// GetSamples gets samples in a file +func GetSamples(url string) ([]string, error) { var samples = []string{} if !util.IsExists(url) { return nil, fmt.Errorf("url(%s) does not exist", url) @@ -294,3 +294,14 @@ func getSamples(url string) ([]string, error) { return samples, nil } + +// validFormat checks data format is valid +func (ds *Dataset) validFormat(format string) error { + for _, v := range []string{TXTFormat, CSVFormat} { + if strings.ToLower(format) == v { + return nil + } + } + + return fmt.Errorf("dataset format(%s) is invalid", format) +} diff --git a/pkg/localcontroller/manager/federatedlearningjob.go b/pkg/localcontroller/managers/federatedlearning/federatedlearningjob.go similarity index 70% rename from pkg/localcontroller/manager/federatedlearningjob.go rename to pkg/localcontroller/managers/federatedlearning/federatedlearningjob.go index 5996a7743..3bace6912 100644 --- a/pkg/localcontroller/manager/federatedlearningjob.go +++ b/pkg/localcontroller/managers/federatedlearning/federatedlearningjob.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package manager +package federatedlearning import ( "encoding/json" @@ -23,14 +23,16 @@ import ( sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/localcontroller/db" - "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + types "github.com/kubeedge/sedna/pkg/localcontroller/managers" "github.com/kubeedge/sedna/pkg/localcontroller/util" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" ) // FederatedLearningManager defines federated-learning-job manager -type FederatedLearningManager struct { - Client gmclient.ClientI - WorkerMessageChannel chan WorkerMessage +type Manager struct { + Client clienttypes.ClientI + WorkerMessageChannel chan workertypes.MessageContent } // FederatedLearning defines config for federated-learning-job @@ -39,29 +41,29 @@ type FederatedLearning struct { } const ( - //FederatedLearningJobKind is kind of federated-learning-job resource - FederatedLearningJobKind = "federatedlearningjob" + //KindName is kind of federated-learning-job resource + KindName = "federatedlearningjob" ) -// NewFederatedLearningManager creates a federated-learning-job manager -func NewFederatedLearningManager(client gmclient.ClientI) FeatureManager { - fm := &FederatedLearningManager{ +// New creates a federated-learning-job types +func New(client clienttypes.ClientI) types.FeatureManager { + fm := &Manager{ Client: client, - WorkerMessageChannel: make(chan WorkerMessage, WorkerMessageChannelCacheSize), + WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize), } return fm } // Start starts federated-learning-job manager -func (fm *FederatedLearningManager) Start() error { +func (fm *Manager) Start() error { go fm.monitorWorker() return nil } // monitorWorker monitors message from worker -func (fm *FederatedLearningManager) monitorWorker() { +func (fm *Manager) monitorWorker() { for { workerMessageChannel := fm.WorkerMessageChannel workerMessage, ok := <-workerMessageChannel @@ -70,17 +72,17 @@ func (fm *FederatedLearningManager) monitorWorker() { } name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind) - header := gmclient.MessageHeader{ + header := clienttypes.MessageHeader{ Namespace: workerMessage.Namespace, ResourceKind: workerMessage.OwnerKind, ResourceName: workerMessage.OwnerName, - Operation: gmclient.StatusOperation, + Operation: clienttypes.StatusOperation, } - um := UpstreamMessage{ + um := clienttypes.UpstreamMessage{ Phase: workerMessage.Kind, Status: workerMessage.Status, - Output: &WorkerOutput{ + Output: &clienttypes.Output{ Models: workerMessage.Results, OwnerInfo: workerMessage.OwnerInfo, }, @@ -94,7 +96,7 @@ func (fm *FederatedLearningManager) monitorWorker() { } // Insert inserts federated-learning-job config in db -func (fm *FederatedLearningManager) Insert(message *gmclient.Message) error { +func (fm *Manager) Insert(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) fl := FederatedLearning{} @@ -111,7 +113,7 @@ func (fm *FederatedLearningManager) Insert(message *gmclient.Message) error { } // Delete deletes federated-learning-job config in db -func (fm *FederatedLearningManager) Delete(message *gmclient.Message) error { +func (fm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) if err := db.DeleteResource(name); err != nil { return err @@ -121,11 +123,11 @@ func (fm *FederatedLearningManager) Delete(message *gmclient.Message) error { } // AddWorkerMessage adds worker messages to the channel -func (fm *FederatedLearningManager) AddWorkerMessage(message WorkerMessage) { +func (fm *Manager) AddWorkerMessage(message workertypes.MessageContent) { fm.WorkerMessageChannel <- message } // GetName returns the name of the manager -func (fm *FederatedLearningManager) GetName() string { - return FederatedLearningJobKind +func (fm *Manager) GetName() string { + return KindName } diff --git a/pkg/localcontroller/manager/incrementallearningjob.go b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go similarity index 83% rename from pkg/localcontroller/manager/incrementallearningjob.go rename to pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go index 20166e0f2..75ffa5967 100644 --- a/pkg/localcontroller/manager/incrementallearningjob.go +++ b/pkg/localcontroller/managers/incrementallearning/incrementallearningjob.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package manager +package incrementallearning import ( "bufio" @@ -34,19 +34,22 @@ import ( gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/incrementallearning" "github.com/kubeedge/sedna/pkg/globalmanager/runtime" "github.com/kubeedge/sedna/pkg/localcontroller/db" - "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/model" "github.com/kubeedge/sedna/pkg/localcontroller/storage" "github.com/kubeedge/sedna/pkg/localcontroller/trigger" "github.com/kubeedge/sedna/pkg/localcontroller/util" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" ) // IncrementalLearningJob defines config for incremental-learning-job -type IncrementalLearningJob struct { +type Job struct { sednav1.IncrementalLearningJob JobConfig *JobConfig - Dataset *Dataset - Done chan struct{} + Dataset *dataset.Dataset Storage storage.Storage + Done chan struct{} } // JobConfig defines config for incremental-learning-job @@ -63,13 +66,15 @@ type JobConfig struct { OutputDir string OutputConfig *OutputConfig DataSamples *DataSamples - TrainModel *ModelInfo - DeployModel *ModelInfo - EvalModels []ModelInfo - EvalResult []ModelInfo + TrainModel *Model + DeployModel *Model + EvalModels []Model + EvalResult []Model Lock sync.Mutex } +type Model = clienttypes.Model + // OutputConfig defines config for job output type OutputConfig struct { SamplesOutput map[string]string `json:"trainData"` @@ -86,12 +91,12 @@ type DataSamples struct { } // IncrementalLearningJob defines incremental-learning-job manager -type IncrementalJobManager struct { - Client gmclient.ClientI - WorkerMessageChannel chan WorkerMessage - DatasetManager *DatasetManager - ModelManager *ModelManager - IncrementalJobMap map[string]*IncrementalLearningJob +type Manager struct { + Client clienttypes.ClientI + WorkerMessageChannel chan workertypes.MessageContent + DatasetManager *dataset.Manager + ModelManager *model.Manager + IncrementalJobMap map[string]*Job VolumeMountPrefix string } @@ -102,19 +107,24 @@ const ( DatasetHandlerIntervalSeconds = 10 // EvalSamplesCapacity is capacity of eval samples EvalSamplesCapacity = 5 - //IncrementalLearningJobKind is kind of incremental-learning-job resource - IncrementalLearningJobKind = "incrementallearningjob" + //KindName is kind of incremental-learning-job resource + KindName = "incrementallearningjob" + + // TriggerReadyStatus is the ready status about trigger + TriggerReadyStatus = "ready" + // TriggerCompletedStatus is the completed status about trigger + TriggerCompletedStatus = "completed" ) -// NewIncrementalJobManager creates a incremental-learning-job manager -func NewIncrementalJobManager(client gmclient.ClientI, datasetManager *DatasetManager, - modelManager *ModelManager, options *options.LocalControllerOptions) *IncrementalJobManager { - im := IncrementalJobManager{ +// New creates a incremental-learning-job manager +func New(client clienttypes.ClientI, datasetManager *dataset.Manager, + modelManager *model.Manager, options *options.LocalControllerOptions) *Manager { + im := Manager{ Client: client, - WorkerMessageChannel: make(chan WorkerMessage, WorkerMessageChannelCacheSize), + WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize), DatasetManager: datasetManager, ModelManager: modelManager, - IncrementalJobMap: make(map[string]*IncrementalLearningJob), + IncrementalJobMap: make(map[string]*Job), VolumeMountPrefix: options.VolumeMountPrefix, } @@ -122,14 +132,14 @@ func NewIncrementalJobManager(client gmclient.ClientI, datasetManager *DatasetMa } // Start starts incremental-learning-job manager -func (im *IncrementalJobManager) Start() error { +func (im *Manager) Start() error { go im.monitorWorker() return nil } // trainTask starts training task -func (im *IncrementalJobManager) trainTask(job *IncrementalLearningJob, currentRound int) error { +func (im *Manager) trainTask(job *Job, currentRound int) error { jobConfig := job.JobConfig latestCond := im.getLatestCondition(job) @@ -182,7 +192,7 @@ func (im *IncrementalJobManager) trainTask(job *IncrementalLearningJob, currentR } // evalTask starts eval task -func (im *IncrementalJobManager) evalTask(job *IncrementalLearningJob) error { +func (im *Manager) evalTask(job *Job) error { jobConfig := job.JobConfig latestCond := im.getLatestCondition(job) @@ -219,13 +229,13 @@ func (im *IncrementalJobManager) evalTask(job *IncrementalLearningJob) error { } // deployTask starts deploy task -func (im *IncrementalJobManager) deployTask(job *IncrementalLearningJob) { +func (im *Manager) deployTask(job *Job) { jobConfig := job.JobConfig var err error var neededDeploy bool neededDeploy, err = im.triggerDeployTask(job) - status := UpstreamMessage{Phase: string(sednav1.ILJobDeploy)} + status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)} if err == nil && neededDeploy { deployModel, err := im.deployModel(job) @@ -238,8 +248,8 @@ func (im *IncrementalJobManager) deployTask(job *IncrementalLearningJob) { status.Status = string(sednav1.ILJobStageCondFailed) } else { status.Status = string(sednav1.ILJobStageCondReady) - status.Input = &WorkerInput{ - Models: []ModelInfo{ + status.Input = &clienttypes.Input{ + Models: []Model{ *deployModel, }, } @@ -264,7 +274,7 @@ func (im *IncrementalJobManager) deployTask(job *IncrementalLearningJob) { } // startJob starts a job -func (im *IncrementalJobManager) startJob(name string) { +func (im *Manager) startJob(name string) { var err error job := im.IncrementalJobMap[name] @@ -321,13 +331,13 @@ func (im *IncrementalJobManager) startJob(name string) { } // Insert inserts incremental-learning-job config to db -func (im *IncrementalJobManager) Insert(message *gmclient.Message) error { +func (im *Manager) Insert(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) first := false job, ok := im.IncrementalJobMap[name] if !ok { - job = &IncrementalLearningJob{} + job = &Job{} job.Storage = storage.Storage{IsLocalStorage: false} job.Done = make(chan struct{}) im.IncrementalJobMap[name] = job @@ -338,7 +348,7 @@ func (im *IncrementalJobManager) Insert(message *gmclient.Message) error { return err } - credential := job.ObjectMeta.Annotations[CredentialAnnotationKey] + credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] if credential != "" { if err := job.Storage.SetCredential(credential); err != nil { return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err) @@ -357,7 +367,7 @@ func (im *IncrementalJobManager) Insert(message *gmclient.Message) error { } // Delete deletes incremental-learning-job config in db -func (im *IncrementalJobManager) Delete(message *gmclient.Message) error { +func (im *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) if job, ok := im.IncrementalJobMap[name]; ok && job.Done != nil { @@ -374,7 +384,7 @@ func (im *IncrementalJobManager) Delete(message *gmclient.Message) error { } // initJob inits the job object -func (im *IncrementalJobManager) initJob(job *IncrementalLearningJob) error { +func (im *Manager) initJob(job *Job) error { jobConfig := job.JobConfig jobConfig.Lock = sync.Mutex{} @@ -433,7 +443,7 @@ func newTrigger(t sednav1.Trigger) (trigger.Base, error) { } // getTrainOrEvalModel gets train model or eval model from job conditions -func (im *IncrementalJobManager) getTrainOrEvalModel(job *IncrementalLearningJob, jobStage sednav1.ILJobStage) *ModelInfo { +func (im *Manager) getTrainOrEvalModel(job *Job, jobStage sednav1.ILJobStage) *Model { jobConditions := job.Status.Conditions // TODO: runtime.type changes to common.type for gm and lc @@ -465,16 +475,16 @@ func (im *IncrementalJobManager) getTrainOrEvalModel(job *IncrementalLearningJob switch jobStage { case sednav1.ILJobTrain: - return &ModelInfo{Format: models[1].Format, URL: models[1].URL} + return &Model{Format: models[1].Format, URL: models[1].URL} case sednav1.ILJobEval: - return &ModelInfo{Format: models[0].Format, URL: models[0].URL} + return &Model{Format: models[0].Format, URL: models[0].URL} } return nil } // triggerTrainTask triggers the train task -func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) (interface{}, bool, error) { +func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { var err error jobConfig := job.JobConfig @@ -489,7 +499,7 @@ func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) ( return nil, false, nil } - var m *ModelInfo + var m *Model latestCondition := im.getLatestCondition(job) rounds := jobConfig.Rounds @@ -519,13 +529,13 @@ func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) ( outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir) } - input := WorkerInput{ - Models: []ModelInfo{*m}, + input := clienttypes.Input{ + Models: []Model{*m}, DataURL: dataURL, DataIndexURL: dataIndexURL, OutputDir: outputDir, } - msg := UpstreamMessage{ + msg := clienttypes.UpstreamMessage{ Phase: string(sednav1.ILJobTrain), Status: string(sednav1.ILJobStageCondReady), Input: &input, @@ -535,7 +545,7 @@ func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) ( } // triggerEvalTask triggers the eval task -func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (*UpstreamMessage, error) { +func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, error) { jobConfig := job.JobConfig var err error @@ -546,7 +556,7 @@ func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (* return nil, err } - models := []ModelInfo{*m, { + models := []Model{*m, { Format: jobConfig.DeployModel.Format, URL: jobConfig.DeployModel.URL, }} @@ -569,12 +579,12 @@ func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (* dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL) } - input := WorkerInput{ + input := clienttypes.Input{ Models: models, DataURL: dataURL, DataIndexURL: dataIndexURL, } - msg := &UpstreamMessage{ + msg := &clienttypes.UpstreamMessage{ Phase: string(sednav1.ILJobEval), Status: string(sednav1.ILJobStageCondReady), Input: &input, @@ -584,7 +594,7 @@ func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (* } // triggerDeployTask triggers the deploy task -func (im *IncrementalJobManager) triggerDeployTask(job *IncrementalLearningJob) (bool, error) { +func (im *Manager) triggerDeployTask(job *Job) (bool, error) { jobConfig := job.JobConfig // EvalResult must has two models info, first is trained model, second is deployed model. @@ -592,7 +602,28 @@ func (im *IncrementalJobManager) triggerDeployTask(job *IncrementalLearningJob) return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult)) } - newMetrics, oldMetrics := jobConfig.EvalResult[0].Metrics, jobConfig.EvalResult[1].Metrics + getMetrics := func(metrics map[string]interface{}) (map[string][]float64, error) { + var err error + bytes, err := json.Marshal(metrics) + if err != nil { + return nil, err + } + + data := make(map[string][]float64) + if err := json.Unmarshal(bytes, &data); err != nil { + return nil, err + } + return data, err + } + + newMetrics, err := getMetrics(jobConfig.EvalResult[0].Metrics) + if err != nil { + return false, err + } + oldMetrics, err := getMetrics(jobConfig.EvalResult[1].Metrics) + if err != nil { + return false, err + } metricDelta := make(map[string]interface{}) for metric := range newMetrics { @@ -622,7 +653,7 @@ func (im *IncrementalJobManager) triggerDeployTask(job *IncrementalLearningJob) } // deployModel deploys model -func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*ModelInfo, error) { +func (im *Manager) deployModel(job *Job) (*Model, error) { jobConfig := job.JobConfig trainedModel := jobConfig.EvalModels[0].URL @@ -640,7 +671,7 @@ func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*Mode return &jobConfig.EvalModels[0], nil } -func (job *IncrementalLearningJob) updateDeployModel(deployModel string, newModel string) error { +func (job *Job) updateDeployModel(deployModel string, newModel string) error { if err := job.Storage.CopyFile(newModel, deployModel); err != nil { return fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v", newModel, deployModel, err) @@ -651,7 +682,7 @@ func (job *IncrementalLearningJob) updateDeployModel(deployModel string, newMode } // createOutputDir creates the job output dir -func (job *IncrementalLearningJob) createOutputDir(jobConfig *JobConfig) error { +func (job *Job) createOutputDir(jobConfig *JobConfig) error { outputDir := jobConfig.OutputDir dirNames := []string{"data/train", "data/eval", "train", "eval"} @@ -684,7 +715,7 @@ func (job *IncrementalLearningJob) createOutputDir(jobConfig *JobConfig) error { return nil } -func (im *IncrementalJobManager) getLatestCondition(job *IncrementalLearningJob) sednav1.ILJobCondition { +func (im *Manager) getLatestCondition(job *Job) sednav1.ILJobCondition { jobConditions := job.Status.Conditions var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{} if len(jobConditions) > 0 { @@ -694,8 +725,8 @@ func (im *IncrementalJobManager) getLatestCondition(job *IncrementalLearningJob) return latestCondition } -func (im *IncrementalJobManager) getModel(namespace string, name string) (sednav1.Model, error) { - modelName := util.GetUniqueIdentifier(namespace, name, ModelResourceKind) +func (im *Manager) getModel(namespace string, name string) (sednav1.Model, error) { + modelName := util.GetUniqueIdentifier(namespace, name, model.KindName) model, ok := im.ModelManager.GetModel(modelName) if !ok { return model, fmt.Errorf("not exists model(name=%s)", modelName) @@ -704,7 +735,7 @@ func (im *IncrementalJobManager) getModel(namespace string, name string) (sednav } // loadTrainModel loads initial model information for training. -func (im *IncrementalJobManager) loadTrainModel(job *IncrementalLearningJob) error { +func (im *Manager) loadTrainModel(job *Job) error { jobConfig := job.JobConfig if jobConfig.TrainModel == nil { @@ -713,7 +744,7 @@ func (im *IncrementalJobManager) loadTrainModel(job *IncrementalLearningJob) err return err } - jobConfig.TrainModel = new(ModelInfo) + jobConfig.TrainModel = new(Model) format := initialModel.Spec.Format url := initialModel.Spec.URL jobConfig.TrainModel.Format = format @@ -723,7 +754,7 @@ func (im *IncrementalJobManager) loadTrainModel(job *IncrementalLearningJob) err } // loadDeployModel loads model information for deploying. -func (im *IncrementalJobManager) loadDeployModel(job *IncrementalLearningJob) error { +func (im *Manager) loadDeployModel(job *Job) error { jobConfig := job.JobConfig if jobConfig.DeployModel == nil { @@ -732,7 +763,7 @@ func (im *IncrementalJobManager) loadDeployModel(job *IncrementalLearningJob) er return err } - jobConfig.DeployModel = new(ModelInfo) + jobConfig.DeployModel = new(Model) jobConfig.DeployModel.Format = evalModel.Spec.Format jobConfig.DeployModel.URL = evalModel.Spec.URL } @@ -740,13 +771,13 @@ func (im *IncrementalJobManager) loadDeployModel(job *IncrementalLearningJob) er } // loadDataset loads dataset information -func (im *IncrementalJobManager) loadDataset(job *IncrementalLearningJob) error { +func (im *Manager) loadDataset(job *Job) error { if job.Dataset != nil { // already loaded return nil } - datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, DatasetResourceKind) + datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, dataset.KindName) dataset, ok := im.DatasetManager.GetDataset(datasetName) if !ok || dataset == nil { return fmt.Errorf("not exists dataset(name=%s)", datasetName) @@ -765,7 +796,7 @@ func (im *IncrementalJobManager) loadDataset(job *IncrementalLearningJob) error } // handleData updates samples information -func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) { +func (im *Manager) handleData(job *Job) { tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second) jobConfig := job.JobConfig @@ -824,7 +855,7 @@ func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) { // createFile creates data file and data index file func createFile(dir string, format string, isLocalStorage bool) (string, string) { switch format { - case "txt": + case dataset.TXTFormat: if isLocalStorage { return path.Join(dir, "data.txt"), "" } @@ -834,7 +865,7 @@ func createFile(dir string, format string, isLocalStorage bool) (string, string) } // writeSamples writes samples information to a file -func (im *IncrementalJobManager) writeSamples(job *IncrementalLearningJob, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) { +func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) { subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/") fileURL, absURLFile := createFile(subDir, format, job.Dataset.Storage.IsLocalStorage) @@ -888,7 +919,7 @@ func (im *IncrementalJobManager) writeSamples(job *IncrementalLearningJob, sampl } // writeByLine writes file by line -func (im *IncrementalJobManager) writeByLine(samples []string, fileURL string) error { +func (im *Manager) writeByLine(samples []string, fileURL string) error { file, err := os.Create(fileURL) if err != nil { klog.Errorf("create file(%s) failed", fileURL) @@ -913,7 +944,7 @@ func (im *IncrementalJobManager) writeByLine(samples []string, fileURL string) e } // monitorWorker monitors message from worker -func (im *IncrementalJobManager) monitorWorker() { +func (im *Manager) monitorWorker() { for { workerMessageChannel := im.WorkerMessageChannel workerMessage, ok := <-workerMessageChannel @@ -930,23 +961,27 @@ func (im *IncrementalJobManager) monitorWorker() { } // TODO: filter some worker messages out - wo := WorkerOutput{} + wo := clienttypes.Output{} wo.Models = workerMessage.Results wo.OwnerInfo = workerMessage.OwnerInfo - msg := &UpstreamMessage{ + msg := &clienttypes.UpstreamMessage{ Phase: workerMessage.Kind, Status: workerMessage.Status, Output: &wo, } - im.Client.WriteMessage(msg, job.getHeader()) + + if err := im.Client.WriteMessage(msg, job.getHeader()); err != nil { + klog.Errorf("job(name=%s) failed to write message: %v", name, err) + continue + } im.handleWorkerMessage(job, workerMessage) } } // handleWorkerMessage handles message from worker -func (im *IncrementalJobManager) handleWorkerMessage(job *IncrementalLearningJob, workerMessage WorkerMessage) { +func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) { latestCond := im.getLatestCondition(job) jobStage := strings.ToLower(string(latestCond.Stage)) workerKind := strings.ToLower(workerMessage.Kind) @@ -957,9 +992,9 @@ func (im *IncrementalJobManager) handleWorkerMessage(job *IncrementalLearningJob return } - var models []ModelInfo + var models []Model for _, result := range workerMessage.Results { - metrics := map[string][]float64{} + metrics := make(map[string]interface{}) if m, ok := result["metrics"]; ok { bytes, err := json.Marshal(m) if err != nil { @@ -975,17 +1010,17 @@ func (im *IncrementalJobManager) handleWorkerMessage(job *IncrementalLearningJob } } - model := ModelInfo{ - result["format"].(string), - result["url"].(string), - metrics} + model := Model{ + Format: result["format"].(string), + URL: result["url"].(string), + Metrics: metrics} models = append(models, model) } workerStatus := workerMessage.Status jobName := job.JobConfig.UniqueIdentifier - if workerStatus == WorkerCompletedStatus { + if workerStatus == workertypes.CompletedStatus { klog.Infof("job(name=%s) complete the %s task successfully", jobName, jobStage) switch latestCond.Stage { case sednav1.ILJobEval: @@ -1011,20 +1046,20 @@ func forwardSamples(jobConfig *JobConfig, jobStage sednav1.ILJobStage) { } // AddWorkerMessage adds worker messages -func (im *IncrementalJobManager) AddWorkerMessage(message WorkerMessage) { +func (im *Manager) AddWorkerMessage(message workertypes.MessageContent) { im.WorkerMessageChannel <- message } // GetName returns name of the manager -func (im *IncrementalJobManager) GetName() string { - return IncrementalLearningJobKind +func (im *Manager) GetName() string { + return KindName } -func (job *IncrementalLearningJob) getHeader() gmclient.MessageHeader { - return gmclient.MessageHeader{ +func (job *Job) getHeader() clienttypes.MessageHeader { + return clienttypes.MessageHeader{ Namespace: job.Namespace, ResourceKind: job.Kind, ResourceName: job.Name, - Operation: gmclient.StatusOperation, + Operation: clienttypes.StatusOperation, } } diff --git a/pkg/localcontroller/manager/jointinferenceservice.go b/pkg/localcontroller/managers/jointinference/jointinferenceservice.go similarity index 68% rename from pkg/localcontroller/manager/jointinferenceservice.go rename to pkg/localcontroller/managers/jointinference/jointinferenceservice.go index e202bb4a2..3bb208e68 100644 --- a/pkg/localcontroller/manager/jointinferenceservice.go +++ b/pkg/localcontroller/managers/jointinference/jointinferenceservice.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package manager +package jointinference import ( "encoding/json" @@ -23,40 +23,42 @@ import ( sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/localcontroller/db" - "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + types "github.com/kubeedge/sedna/pkg/localcontroller/managers" "github.com/kubeedge/sedna/pkg/localcontroller/util" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" ) // JointInferenceManager defines joint-inference-service manager -type JointInferenceManager struct { - Client gmclient.ClientI - WorkerMessageChannel chan WorkerMessage +type Manager struct { + Client clienttypes.ClientI + WorkerMessageChannel chan workertypes.MessageContent } const ( - // JointInferenceServiceKind is kind of joint-inference-service resource - JointInferenceServiceKind = "jointinferenceservice" + // KindName is kind of joint-inference-service resource + KindName = "jointinferenceservice" ) -// NewJointInferenceManager creates a joint inference manager -func NewJointInferenceManager(client gmclient.ClientI) FeatureManager { - jm := &JointInferenceManager{ +// New creates a joint inference manager +func New(client clienttypes.ClientI) types.FeatureManager { + jm := &Manager{ Client: client, - WorkerMessageChannel: make(chan WorkerMessage, WorkerMessageChannelCacheSize), + WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize), } return jm } // Start starts joint-inference-service manager -func (jm *JointInferenceManager) Start() error { +func (jm *Manager) Start() error { go jm.monitorWorker() return nil } // monitorWorker monitors message from worker -func (jm *JointInferenceManager) monitorWorker() { +func (jm *Manager) monitorWorker() { for { workerMessageChannel := jm.WorkerMessageChannel workerMessage, ok := <-workerMessageChannel @@ -65,17 +67,17 @@ func (jm *JointInferenceManager) monitorWorker() { } name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind) - header := gmclient.MessageHeader{ + header := clienttypes.MessageHeader{ Namespace: workerMessage.Namespace, ResourceKind: workerMessage.OwnerKind, ResourceName: workerMessage.OwnerName, - Operation: gmclient.StatusOperation, + Operation: clienttypes.StatusOperation, } - um := UpstreamMessage{ + um := clienttypes.UpstreamMessage{ Phase: workerMessage.Kind, Status: workerMessage.Status, - Output: &WorkerOutput{ + Output: &clienttypes.Output{ OwnerInfo: workerMessage.OwnerInfo, }, } @@ -88,7 +90,7 @@ func (jm *JointInferenceManager) monitorWorker() { } // Insert inserts joint-inference-service config in db -func (jm *JointInferenceManager) Insert(message *gmclient.Message) error { +func (jm *Manager) Insert(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) ji := sednav1.JointInferenceService{} @@ -105,7 +107,7 @@ func (jm *JointInferenceManager) Insert(message *gmclient.Message) error { } // Delete deletes joint-inference-service config in db -func (jm *JointInferenceManager) Delete(message *gmclient.Message) error { +func (jm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) if err := db.DeleteResource(name); err != nil { return err @@ -115,11 +117,11 @@ func (jm *JointInferenceManager) Delete(message *gmclient.Message) error { } // AddWorkerMessage adds worker messages -func (jm *JointInferenceManager) AddWorkerMessage(message WorkerMessage) { +func (jm *Manager) AddWorkerMessage(message workertypes.MessageContent) { jm.WorkerMessageChannel <- message } // GetName gets kind of the manager -func (jm *JointInferenceManager) GetName() string { - return JointInferenceServiceKind +func (jm *Manager) GetName() string { + return KindName } diff --git a/pkg/localcontroller/manager/lifelonglearningjob.go b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go similarity index 82% rename from pkg/localcontroller/manager/lifelonglearningjob.go rename to pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go index c6e287d61..114ccff0e 100644 --- a/pkg/localcontroller/manager/lifelonglearningjob.go +++ b/pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package manager +package lifelonglearning import ( "bufio" @@ -31,31 +31,46 @@ import ( "github.com/kubeedge/sedna/cmd/sedna-lc/app/options" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" + "github.com/kubeedge/sedna/pkg/globalmanager/runtime" "github.com/kubeedge/sedna/pkg/localcontroller/db" - "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset" "github.com/kubeedge/sedna/pkg/localcontroller/storage" "github.com/kubeedge/sedna/pkg/localcontroller/trigger" "github.com/kubeedge/sedna/pkg/localcontroller/util" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" ) const ( - //LifelongLearningJobKind is kind of lifelong-learning-job resource - LifelongLearningJobKind = "lifelonglearningjob" + //KindName is kind of lifelong-learning-job resource + KindName = "lifelonglearningjob" + + // TrainPhase is the train phase + TrainPhase = "train" + // EvalPhase is the eval phase + EvalPhase = "eval" + // DeployPhase is the deploy phase + DeployPhase = "deploy" + + // TriggerReadyStatus is the ready status about trigger + TriggerReadyStatus = "ready" + // TriggerCompletedStatus is the completed status about trigger + TriggerCompletedStatus = "completed" ) // LifelongLearningJobManager defines lifelong-learning-job Manager -type LifelongLearningJobManager struct { - Client gmclient.ClientI - WorkerMessageChannel chan WorkerMessage - DatasetManager *DatasetManager - LifelongLearningJobMap map[string]*LifelongLearningJob +type Manager struct { + Client clienttypes.ClientI + WorkerMessageChannel chan workertypes.MessageContent + DatasetManager *dataset.Manager + LifelongLearningJobMap map[string]*Job VolumeMountPrefix string } // LifelongLearningJob defines config for lifelong-learning-job -type LifelongLearningJob struct { +type Job struct { sednav1.LifelongLearningJob - Dataset *Dataset + Dataset *dataset.Dataset Done chan struct{} Storage storage.Storage JobConfig *LLJobConfig @@ -75,12 +90,14 @@ type LLJobConfig struct { OutputDir string OutputConfig *LLOutputConfig DataSamples *LLDataSamples - TrainModel *ModelInfo - DeployModel *ModelInfo - EvalResult *ModelInfo + TrainModel *Model + DeployModel *Model + EvalResult *Model Lock sync.Mutex } +type Model = clienttypes.Model + // LLOutputConfig defines config for job output type LLOutputConfig struct { SamplesOutput map[string]string @@ -105,14 +122,13 @@ const ( LLEvalSamplesCapacity = 5 ) -// NewLifelongLearningJobManager creates a lifelong-learning-job manager -func NewLifelongLearningJobManager(client gmclient.ClientI, datasetManager *DatasetManager, - modelManager *ModelManager, options *options.LocalControllerOptions) *LifelongLearningJobManager { - lm := LifelongLearningJobManager{ +// New creates a lifelong-learning-job manager +func New(client clienttypes.ClientI, datasetManager *dataset.Manager, options *options.LocalControllerOptions) *Manager { + lm := Manager{ Client: client, - WorkerMessageChannel: make(chan WorkerMessage, WorkerMessageChannelCacheSize), + WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize), DatasetManager: datasetManager, - LifelongLearningJobMap: make(map[string]*LifelongLearningJob), + LifelongLearningJobMap: make(map[string]*Job), VolumeMountPrefix: options.VolumeMountPrefix, } @@ -120,13 +136,13 @@ func NewLifelongLearningJobManager(client gmclient.ClientI, datasetManager *Data } // Insert inserts lifelong-learning-job config to db -func (lm *LifelongLearningJobManager) Insert(message *gmclient.Message) error { +func (lm *Manager) Insert(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) first := false job, ok := lm.LifelongLearningJobMap[name] if !ok { - job = &LifelongLearningJob{} + job = &Job{} job.Storage = storage.Storage{IsLocalStorage: false} job.Done = make(chan struct{}) lm.LifelongLearningJobMap[name] = job @@ -137,7 +153,7 @@ func (lm *LifelongLearningJobManager) Insert(message *gmclient.Message) error { return err } - credential := job.ObjectMeta.Annotations[CredentialAnnotationKey] + credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey] if credential != "" { if err := job.Storage.SetCredential(credential); err != nil { return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err) @@ -156,7 +172,7 @@ func (lm *LifelongLearningJobManager) Insert(message *gmclient.Message) error { } // startJob starts a job -func (lm *LifelongLearningJobManager) startJob(name string) { +func (lm *Manager) startJob(name string) { var err error job, ok := lm.LifelongLearningJobMap[name] if !ok { @@ -215,10 +231,10 @@ func (lm *LifelongLearningJobManager) startJob(name string) { } // trainTask starts training task -func (lm *LifelongLearningJobManager) trainTask(job *LifelongLearningJob) error { +func (lm *Manager) trainTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == WorkerReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { + if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { payload, ok, err := lm.triggerTrainTask(job) if !ok { return nil @@ -243,13 +259,13 @@ func (lm *LifelongLearningJobManager) trainTask(job *LifelongLearningJob) error jobConfig.UniqueIdentifier, jobConfig.Phase) } - if jobConfig.WorkerStatus == WorkerFailedStatus { + if jobConfig.WorkerStatus == workertypes.FailedStatus { klog.Warningf("found the %sing phase worker that ran failed, "+ "back the training phase triggering task", jobConfig.Phase) backLLTaskStatus(jobConfig) } - if jobConfig.WorkerStatus == WorkerCompletedStatus { + if jobConfig.WorkerStatus == workertypes.CompletedStatus { klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) nextLLTask(jobConfig) } @@ -258,10 +274,10 @@ func (lm *LifelongLearningJobManager) trainTask(job *LifelongLearningJob) error } // evalTask starts eval task -func (lm *LifelongLearningJobManager) evalTask(job *LifelongLearningJob) error { +func (lm *Manager) evalTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == WorkerReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { + if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { payload, err := lm.triggerEvalTask(job) if err != nil { klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v", @@ -280,14 +296,14 @@ func (lm *LifelongLearningJobManager) evalTask(job *LifelongLearningJob) error { jobConfig.UniqueIdentifier, jobConfig.Phase) } - if jobConfig.WorkerStatus == WorkerFailedStatus { + if jobConfig.WorkerStatus == workertypes.FailedStatus { msg := fmt.Sprintf("job(name=%s) found the %sing phase worker that ran failed, "+ "back the training phase triggering task", jobConfig.UniqueIdentifier, jobConfig.Phase) klog.Errorf(msg) return fmt.Errorf(msg) } - if jobConfig.WorkerStatus == WorkerCompletedStatus { + if jobConfig.WorkerStatus == workertypes.CompletedStatus { klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, jobConfig.Phase) nextLLTask(jobConfig) } @@ -296,11 +312,11 @@ func (lm *LifelongLearningJobManager) evalTask(job *LifelongLearningJob) error { } // deployTask starts deploy task -func (lm *LifelongLearningJobManager) deployTask(job *LifelongLearningJob) error { +func (lm *Manager) deployTask(job *Job) error { jobConfig := job.JobConfig - if jobConfig.WorkerStatus == WorkerReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { - status := UpstreamMessage{} + if jobConfig.WorkerStatus == workertypes.ReadyStatus && jobConfig.TriggerStatus == TriggerReadyStatus { + status := clienttypes.UpstreamMessage{} status.Phase = DeployPhase deployModel, err := lm.deployModel(job) if err != nil { @@ -309,11 +325,11 @@ func (lm *LifelongLearningJobManager) deployTask(job *LifelongLearningJob) error klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier) } if err != nil || deployModel == nil { - status.Status = WorkerFailedStatus + status.Status = workertypes.FailedStatus } else { - status.Status = WorkerReadyStatus - status.Input = &WorkerInput{ - Models: []ModelInfo{ + status.Status = workertypes.ReadyStatus + status.Input = &clienttypes.Input{ + Models: []Model{ *deployModel, }, } @@ -334,7 +350,7 @@ func (lm *LifelongLearningJobManager) deployTask(job *LifelongLearningJob) error } // triggerTrainTask triggers the train task -func (lm *LifelongLearningJobManager) triggerTrainTask(job *LifelongLearningJob) (interface{}, bool, error) { +func (lm *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) { var err error jobConfig := job.JobConfig @@ -367,14 +383,14 @@ func (lm *LifelongLearningJobManager) triggerTrainTask(job *LifelongLearningJob) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) } - input := WorkerInput{ + input := clienttypes.Input{ DataURL: dataURL, DataIndexURL: dataIndexURL, OutputDir: outputDir, } - msg := UpstreamMessage{ + msg := clienttypes.UpstreamMessage{ Phase: TrainPhase, - Status: WorkerReadyStatus, + Status: workertypes.ReadyStatus, Input: &input, } jobConfig.TriggerTime = time.Now() @@ -382,7 +398,7 @@ func (lm *LifelongLearningJobManager) triggerTrainTask(job *LifelongLearningJob) } // triggerEvalTask triggers the eval task -func (lm *LifelongLearningJobManager) triggerEvalTask(job *LifelongLearningJob) (*UpstreamMessage, error) { +func (lm *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, error) { jobConfig := job.JobConfig var err error @@ -394,8 +410,8 @@ func (lm *LifelongLearningJobManager) triggerEvalTask(job *LifelongLearningJob) return nil, err } - var models []ModelInfo - models = append(models, ModelInfo{ + var models []Model + models = append(models, Model{ Format: jobConfig.TrainModel.Format, URL: jobConfig.TrainModel.URL, }) @@ -408,15 +424,15 @@ func (lm *LifelongLearningJobManager) triggerEvalTask(job *LifelongLearningJob) outputDir = util.TrimPrefixPath(lm.VolumeMountPrefix, outputDir) } - input := WorkerInput{ + input := clienttypes.Input{ Models: models, DataURL: dataURL, DataIndexURL: dataIndexURL, OutputDir: outputDir, } - msg := &UpstreamMessage{ + msg := &clienttypes.UpstreamMessage{ Phase: EvalPhase, - Status: WorkerReadyStatus, + Status: workertypes.ReadyStatus, Input: &input, } @@ -424,10 +440,10 @@ func (lm *LifelongLearningJobManager) triggerEvalTask(job *LifelongLearningJob) } // deployModel deploys model -func (lm *LifelongLearningJobManager) deployModel(job *LifelongLearningJob) (*ModelInfo, error) { +func (lm *Manager) deployModel(job *Job) (*Model, error) { jobConfig := job.JobConfig - model := &ModelInfo{} + model := &Model{} model = jobConfig.EvalResult if job.Storage.IsLocalStorage { @@ -447,7 +463,7 @@ func (lm *LifelongLearningJobManager) deployModel(job *LifelongLearningJob) (*Mo } // createOutputDir creates the job output dir -func (job *LifelongLearningJob) createOutputDir(jobConfig *LLJobConfig) error { +func (job *Job) createOutputDir(jobConfig *LLJobConfig) error { outputDir := jobConfig.OutputDir dirNames := []string{"data/train", "data/eval", "train", "eval"} @@ -482,14 +498,14 @@ func (job *LifelongLearningJob) createOutputDir(jobConfig *LLJobConfig) error { } // createFile creates data file and data index file -func (job *LifelongLearningJob) createFile(dir string, format string, isLocalStorage bool) (string, string) { +func (job *Job) createFile(dir string, format string, isLocalStorage bool) (string, string) { switch strings.ToLower(format) { - case DatasetFormatTXT: + case dataset.TXTFormat: if isLocalStorage { return path.Join(dir, "data.txt"), "" } return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/") - case DatasetFormatCSV: + case dataset.CSVFormat: return strings.Join([]string{dir, "data.csv"}, "/"), "" } @@ -497,7 +513,7 @@ func (job *LifelongLearningJob) createFile(dir string, format string, isLocalSto } // writeLLJSamples writes samples information to a file -func (job *LifelongLearningJob) writeLLJSamples(samples []string, dir string) (string, string, error) { +func (job *Job) writeLLJSamples(samples []string, dir string) (string, string, error) { version := job.JobConfig.Version format := job.Dataset.Spec.Format urlPrefix := job.Dataset.URLPrefix @@ -558,7 +574,7 @@ func (job *LifelongLearningJob) writeLLJSamples(samples []string, dir string) (s } // writeByLine writes file by line -func (job *LifelongLearningJob) writeByLine(samples []string, fileURL string, format string) error { +func (job *Job) writeByLine(samples []string, fileURL string, format string) error { file, err := os.Create(fileURL) if err != nil { klog.Errorf("create file(%s) failed", fileURL) @@ -588,7 +604,7 @@ func (job *LifelongLearningJob) writeByLine(samples []string, fileURL string, fo } // handleData updates samples information -func (lm *LifelongLearningJobManager) handleData(job *LifelongLearningJob) { +func (lm *Manager) handleData(job *Job) { tick := time.NewTicker(LLHandlerDataIntervalSeconds * time.Second) jobConfig := job.JobConfig @@ -643,13 +659,13 @@ func (lm *LifelongLearningJobManager) handleData(job *LifelongLearningJob) { } } -func (lm *LifelongLearningJobManager) loadDataset(job *LifelongLearningJob) error { +func (lm *Manager) loadDataset(job *Job) error { if job.Dataset != nil { // already loaded return nil } - datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, DatasetResourceKind) + datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, dataset.KindName) dataset, ok := lm.DatasetManager.GetDataset(datasetName) if !ok || dataset == nil { return fmt.Errorf("not exists dataset(name=%s)", datasetName) @@ -668,15 +684,15 @@ func (lm *LifelongLearningJobManager) loadDataset(job *LifelongLearningJob) erro } // initJob inits the job object -func (lm *LifelongLearningJobManager) initJob(job *LifelongLearningJob) error { +func (lm *Manager) initJob(job *Job) error { jobConfig := job.JobConfig - jobConfig.TrainModel = new(ModelInfo) - jobConfig.EvalResult = new(ModelInfo) + jobConfig.TrainModel = new(Model) + jobConfig.EvalResult = new(Model) jobConfig.Lock = sync.Mutex{} jobConfig.Version = 0 jobConfig.Phase = TrainPhase - jobConfig.WorkerStatus = WorkerReadyStatus + jobConfig.WorkerStatus = workertypes.ReadyStatus jobConfig.TriggerStatus = TriggerReadyStatus trainTrigger, err := newLLTrigger(job.Spec.TrainSpec.Trigger) if err != nil { @@ -702,7 +718,7 @@ func (lm *LifelongLearningJobManager) initJob(job *LifelongLearningJob) error { return err } - jobConfig.DeployModel = &ModelInfo{ + jobConfig.DeployModel = &Model{ Format: "pkl", URL: strings.Join([]string{strings.TrimRight(outputDir, "/"), "deploy/index.pkl"}, "/"), } @@ -751,7 +767,7 @@ func backLLTaskStatus(jobConfig *LLJobConfig) { // initLLTaskStatus inits task status func initLLTaskStatus(jobConfig *LLJobConfig) { - jobConfig.WorkerStatus = WorkerReadyStatus + jobConfig.WorkerStatus = workertypes.ReadyStatus jobConfig.TriggerStatus = TriggerReadyStatus } @@ -779,7 +795,7 @@ func nextLLTask(jobConfig *LLJobConfig) { } // Delete deletes lifelong-learning-job config in db -func (lm *LifelongLearningJobManager) Delete(message *gmclient.Message) error { +func (lm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) if job, ok := lm.LifelongLearningJobMap[name]; ok && job.Done != nil { @@ -796,14 +812,14 @@ func (lm *LifelongLearningJobManager) Delete(message *gmclient.Message) error { } // Start starts LifelongLearningJob manager -func (lm *LifelongLearningJobManager) Start() error { +func (lm *Manager) Start() error { go lm.monitorWorker() return nil } // monitorWorker monitors message from worker -func (lm *LifelongLearningJobManager) monitorWorker() { +func (lm *Manager) monitorWorker() { for { workerMessageChannel := lm.WorkerMessageChannel workerMessage, ok := <-workerMessageChannel @@ -820,11 +836,11 @@ func (lm *LifelongLearningJobManager) monitorWorker() { } // TODO: filter some worker messages out - wo := WorkerOutput{} + wo := clienttypes.Output{} wo.Models = workerMessage.Results wo.OwnerInfo = workerMessage.OwnerInfo - msg := &UpstreamMessage{ + msg := &clienttypes.UpstreamMessage{ Phase: workerMessage.Kind, Status: workerMessage.Status, Output: &wo, @@ -836,7 +852,7 @@ func (lm *LifelongLearningJobManager) monitorWorker() { } // handleWorkerMessage handles message from worker -func (lm *LifelongLearningJobManager) handleWorkerMessage(job *LifelongLearningJob, workerMessage WorkerMessage) { +func (lm *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) { jobPhase := job.JobConfig.Phase workerKind := workerMessage.Kind if jobPhase != workerKind { @@ -845,15 +861,15 @@ func (lm *LifelongLearningJobManager) handleWorkerMessage(job *LifelongLearningJ return } - var models []*ModelInfo + var models []*Model for _, result := range workerMessage.Results { - model := ModelInfo{ + model := Model{ Format: result["format"].(string), URL: result["url"].(string)} models = append(models, &model) } - model := &ModelInfo{} + model := &Model{} if len(models) != 1 { return } @@ -861,7 +877,7 @@ func (lm *LifelongLearningJobManager) handleWorkerMessage(job *LifelongLearningJ job.JobConfig.WorkerStatus = workerMessage.Status - if job.JobConfig.WorkerStatus == WorkerCompletedStatus { + if job.JobConfig.WorkerStatus == workertypes.CompletedStatus { switch job.JobConfig.Phase { case TrainPhase: job.JobConfig.TrainModel = model @@ -872,20 +888,20 @@ func (lm *LifelongLearningJobManager) handleWorkerMessage(job *LifelongLearningJ } // AddWorkerMessage adds worker messages -func (lm *LifelongLearningJobManager) AddWorkerMessage(message WorkerMessage) { +func (lm *Manager) AddWorkerMessage(message workertypes.MessageContent) { lm.WorkerMessageChannel <- message } // GetName returns name of the manager -func (lm *LifelongLearningJobManager) GetName() string { - return LifelongLearningJobKind +func (lm *Manager) GetName() string { + return KindName } -func (job *LifelongLearningJob) getHeader() gmclient.MessageHeader { - return gmclient.MessageHeader{ +func (job *Job) getHeader() clienttypes.MessageHeader { + return clienttypes.MessageHeader{ Namespace: job.Namespace, ResourceKind: job.Kind, ResourceName: job.Name, - Operation: gmclient.StatusOperation, + Operation: clienttypes.StatusOperation, } } diff --git a/pkg/localcontroller/manager/model.go b/pkg/localcontroller/managers/model/model.go similarity index 68% rename from pkg/localcontroller/manager/model.go rename to pkg/localcontroller/managers/model/model.go index 37c8aafcf..b81d01204 100644 --- a/pkg/localcontroller/manager/model.go +++ b/pkg/localcontroller/managers/model/model.go @@ -14,33 +14,32 @@ See the License for the specific language governing permissions and limitations under the License. */ -package manager +package model import ( "encoding/json" sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1" "github.com/kubeedge/sedna/pkg/localcontroller/db" - "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" "github.com/kubeedge/sedna/pkg/localcontroller/util" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" ) // ModelManager defines model manager -type ModelManager struct { - Client gmclient.ClientI +type Manager struct { + Client clienttypes.ClientI ModelMap map[string]sednav1.Model } const ( - // ModelCacheSize is size of cache - ModelCacheSize = 100 - // ModelResourceKind is kind of dataset resource - ModelResourceKind = "model" + // KindName is kind of model resource + KindName = "model" ) -// NewModelManager creates a model manager -func NewModelManager(client gmclient.ClientI) *ModelManager { - mm := ModelManager{ +// New creates a model manager +func New(client clienttypes.ClientI) *Manager { + mm := Manager{ ModelMap: make(map[string]sednav1.Model), Client: client, } @@ -49,23 +48,23 @@ func NewModelManager(client gmclient.ClientI) *ModelManager { } // Start starts model manager -func (mm *ModelManager) Start() error { +func (mm *Manager) Start() error { return nil } // GetModel gets model -func (mm *ModelManager) GetModel(name string) (sednav1.Model, bool) { +func (mm *Manager) GetModel(name string) (sednav1.Model, bool) { model, ok := mm.ModelMap[name] return model, ok } // addNewModel adds model -func (mm *ModelManager) addNewModel(name string, model sednav1.Model) { +func (mm *Manager) addNewModel(name string, model sednav1.Model) { mm.ModelMap[name] = model } // insertModel inserts model config to db -func (mm *ModelManager) Insert(message *gmclient.Message) error { +func (mm *Manager) Insert(message *clienttypes.Message) error { model := sednav1.Model{} name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) @@ -83,7 +82,7 @@ func (mm *ModelManager) Insert(message *gmclient.Message) error { } // Delete deletes model in db -func (mm *ModelManager) Delete(message *gmclient.Message) error { +func (mm *Manager) Delete(message *clienttypes.Message) error { name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind) delete(mm.ModelMap, name) @@ -95,10 +94,10 @@ func (mm *ModelManager) Delete(message *gmclient.Message) error { return nil } -func (mm *ModelManager) GetName() string { - return ModelResourceKind +func (mm *Manager) GetName() string { + return KindName } -func (mm *ModelManager) AddWorkerMessage(message WorkerMessage) { +func (mm *Manager) AddWorkerMessage(message workertypes.MessageContent) { // dummy } diff --git a/pkg/localcontroller/managers/types.go b/pkg/localcontroller/managers/types.go new file mode 100644 index 000000000..d7f0eb1b5 --- /dev/null +++ b/pkg/localcontroller/managers/types.go @@ -0,0 +1,39 @@ +/* +Copyright 2021 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package managers + +import ( + clienttype "github.com/kubeedge/sedna/pkg/localcontroller/gmclient" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" +) + +// FeatureManager defines feature managers +type FeatureManager interface { + // Start starts the managers + Start() error + + // GetName returns name of the managers + GetName() string + + // AddWorkerMessage dispatch the worker message to managers + AddWorkerMessage(message workertypes.MessageContent) + + // Insert includes gm message creation/updation + Insert(*clienttype.Message) error + + Delete(*clienttype.Message) error +} diff --git a/pkg/localcontroller/server/server.go b/pkg/localcontroller/server/server.go index ade5a4c9a..be6c3a7b4 100644 --- a/pkg/localcontroller/server/server.go +++ b/pkg/localcontroller/server/server.go @@ -25,7 +25,8 @@ import ( "github.com/kubeedge/sedna/cmd/sedna-lc/app/options" "github.com/kubeedge/sedna/pkg/localcontroller/common/constants" - "github.com/kubeedge/sedna/pkg/localcontroller/manager" + "github.com/kubeedge/sedna/pkg/localcontroller/managers" + workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker" ) // Server defines server @@ -37,7 +38,7 @@ type Server struct { // Resource defines resource type Resource struct { - Worker map[string]manager.WorkerMessage + Worker map[string]workertypes.MessageContent } // ResponseMessage defines send message to worker @@ -46,7 +47,7 @@ type ResponseMessage struct { Message string } -type featureManagerMap map[string]manager.FeatureManager +type featureManagerMap map[string]managers.FeatureManager // New creates a new LC server func New(options *options.LocalControllerOptions) *Server { @@ -59,7 +60,7 @@ func New(options *options.LocalControllerOptions) *Server { return &s } -func (s *Server) AddFeatureManager(m manager.FeatureManager) { +func (s *Server) AddFeatureManager(m managers.FeatureManager) { s.fmm[m.GetName()] = m } @@ -95,7 +96,7 @@ func (s *Server) reply(response *restful.Response, statusCode int, msg string) e func (s *Server) messageHandler(request *restful.Request, response *restful.Response) { var err error workerName := request.PathParameter("worker-name") - workerMessage := manager.WorkerMessage{} + workerMessage := workertypes.MessageContent{} err = request.ReadEntity(&workerMessage) if workerMessage.Name != workerName || err != nil { @@ -130,7 +131,7 @@ func (s *Server) messageHandler(request *restful.Request, response *restful.Resp // ListenAndServe starts server func (s *Server) ListenAndServe() { wsContainer := restful.NewContainer() - resource := Resource{map[string]manager.WorkerMessage{}} + resource := Resource{map[string]workertypes.MessageContent{}} s.Resource = &resource s.register(wsContainer) diff --git a/pkg/localcontroller/storage/storage.go b/pkg/localcontroller/storage/storage.go index 9866bd869..8855fc95f 100644 --- a/pkg/localcontroller/storage/storage.go +++ b/pkg/localcontroller/storage/storage.go @@ -23,6 +23,7 @@ import ( "path" "path/filepath" + "github.com/kubeedge/sedna/pkg/globalmanager/runtime" "github.com/kubeedge/sedna/pkg/localcontroller/util" ) @@ -31,14 +32,6 @@ const ( S3Prefix = "s3" // LocalPrefix defines that prefix of url is local host LocalPrefix = "" - // S3EndPoint is s3 endpoint of the storage service - S3Endpoint = "s3-endpoint" - // S3UseHTTPS determines whether to use HTTPS protocol - S3UseHTTPS = "s3-usehttps" - // AccessKeyId is access key id of the storage service - AccessKeyID = "ACCESS_KEY_ID" - // SecretAccessKey is secret access key of the storage service - SecretAccessKey = "SECRET_ACCESS_KEY" ) type Storage struct { @@ -124,22 +117,22 @@ func (s *Storage) SetCredential(credential string) error { return err } - endpoint, err := checkMapKeyExists(m, S3Endpoint) + endpoint, err := checkMapKeyExists(m, runtime.S3EndpointKey) if err != nil { return err } - useHTTPS, err := checkMapKeyExists(m, S3UseHTTPS) + useHTTPS, err := checkMapKeyExists(m, runtime.S3UseHTTPSKey) if err != nil { useHTTPS = "1" } - ak, err := checkMapKeyExists(m, AccessKeyID) + ak, err := checkMapKeyExists(m, runtime.AccessKeyID) if err != nil { return err } - sk, err := checkMapKeyExists(m, SecretAccessKey) + sk, err := checkMapKeyExists(m, runtime.SecretAccessKey) if err != nil { return err } diff --git a/pkg/localcontroller/worker/worker.go b/pkg/localcontroller/worker/worker.go new file mode 100644 index 000000000..a8e1d9a20 --- /dev/null +++ b/pkg/localcontroller/worker/worker.go @@ -0,0 +1,48 @@ +/* +Copyright 2021 The KubeEdge Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package worker + +// MessageContent defines the body content of message that comes from workers. +type MessageContent struct { + // Name is worker name + Name string `json:"name"` + Namespace string `json:"namespace"` + // OwnerName is job name + OwnerName string `json:"ownerName"` + // OwnerKind is kind of job + OwnerKind string `json:"ownerKind"` + // OwnerInfo is info about job + OwnerInfo map[string]interface{} `json:"ownerInfo"` + // Kind is worker phase, include train/eval/deploy + Kind string `json:"kind"` + // Status is worker status, include running/completed/failed + Status string `json:"status"` + // Results is the output of worker when it was completed + Results []map[string]interface{} `json:"results"` +} + +const ( + // MessageChannelCacheSize is size of worker message channel cache + MessageChannelCacheSize = 100 + + // ReadyStatus is the ready status about worker + ReadyStatus = "ready" + // CompletedStatus is the completed status about worker + CompletedStatus = "completed" + // FailedStatus is the failed status about worker + FailedStatus = "failed" +)