diff --git a/pkg/controller/studyjob/studyjob_controller.go b/pkg/controller/studyjob/studyjob_controller.go index 1945160c877..470365e3692 100644 --- a/pkg/controller/studyjob/studyjob_controller.go +++ b/pkg/controller/studyjob/studyjob_controller.go @@ -35,7 +35,9 @@ import ( batchv1beta "k8s.io/api/batch/v1beta1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" k8syaml "k8s.io/apimachinery/pkg/util/yaml" "sigs.k8s.io/controller-runtime/pkg/client" @@ -120,7 +122,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { if isFatalWatchError(err, TFJobWorker) { return err } - err = c.Watch( &source.Kind{Type: &pytorchjobv1beta1.PyTorchJob{}}, &handler.EnqueueRequestForOwner{ @@ -296,15 +297,15 @@ func (r *ReconcileStudyJobController) checkGoal(instance *katibv1alpha1.StudyJob return goal, nil } -func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, w *katibv1alpha1.WorkerCondition) error { - wid := w.WorkerID - obj := createWorkerJobObj(w.Kind) +func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, wid string, wkind *schema.GroupVersionKind) error { nname := types.NamespacedName{Namespace: ns, Name: wid} var wretain, mcretain bool = false, false if instance.Spec.WorkerSpec != nil { wretain = instance.Spec.WorkerSpec.Retain } if !wretain { + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(*wkind) joberr := r.Client.Get(context.TODO(), nname, obj) if joberr == nil { if err := r.Delete(context.TODO(), obj, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil { @@ -393,46 +394,56 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins return update, nil } -func (r *ReconcileStudyJobController) getJobWorkerStatus(w *katibv1alpha1.WorkerCondition, ns string) WorkerStatus { - runtimejob := createWorkerJobObj(w.Kind) - nname := types.NamespacedName{Namespace: ns, Name: w.WorkerID} - joberr := r.Client.Get(context.TODO(), nname, runtimejob) - if joberr != nil { - return WorkerStatus{} - } +func (r *ReconcileStudyJobController) getJobWorkerStatus(ns string, wid string, wkind *schema.GroupVersionKind) WorkerStatus { + nname := types.NamespacedName{Namespace: ns, Name: wid} var state katibapi.State = katibapi.State_RUNNING var cpTime *metav1.Time - switch w.Kind { + switch wkind.Kind { + case DefaultJobWorker: - job := runtimejob.(*batchv1.Job) + var job batchv1.Job + if err := r.Client.Get(context.TODO(), nname, &job); err != nil { + log.Printf("Client Get error %v for %v", err, nname) + return WorkerStatus{} + } if job.Status.Active == 0 && job.Status.Succeeded > 0 { state = katibapi.State_COMPLETED } else if job.Status.Failed > 0 { state = katibapi.State_ERROR } cpTime = job.Status.CompletionTime - case TFJobWorker: - job := runtimejob.(*tfjobv1beta1.TFJob) - if len(job.Status.Conditions) > 0 { - lc := job.Status.Conditions[len(job.Status.Conditions)-1] - if lc.Type == commonv1beta1.JobSucceeded { - state = katibapi.State_COMPLETED - } else if lc.Type == commonv1beta1.JobFailed { - state = katibapi.State_ERROR - } + + default: + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(*wkind) + if err := r.Client.Get(context.TODO(), nname, u); err != nil { + log.Printf("Client Get error %v for %v", err, nname) + return WorkerStatus{} } - cpTime = job.Status.CompletionTime - case PyTorchJobWorker: - job := runtimejob.(*pytorchjobv1beta1.PyTorchJob) - if len(job.Status.Conditions) > 0 { - lc := job.Status.Conditions[len(job.Status.Conditions)-1] - if lc.Type == commonv1beta1.JobSucceeded { - state = katibapi.State_COMPLETED - } else if lc.Type == commonv1beta1.JobFailed { - state = katibapi.State_ERROR + status, ok, unerr := unstructured.NestedFieldCopy(u.Object, "status") + + if ok { + statusMap := status.(map[string]interface{}) + jobStatus := commonv1beta1.JobStatus{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) + if err != nil { + log.Printf("Error in converting unstructured to status: %v ", err) + return WorkerStatus{} } + if len(jobStatus.Conditions) > 0 { + lc := jobStatus.Conditions[len(jobStatus.Conditions)-1] + if lc.Type == commonv1beta1.JobSucceeded { + state = katibapi.State_COMPLETED + } else if lc.Type == commonv1beta1.JobFailed { + state = katibapi.State_ERROR + } + } + cpTime = jobStatus.CompletionTime + + } else if unerr != nil { + log.Printf("Error in getting Job Status from unstructured: %v", unerr) + return WorkerStatus{} } - cpTime = job.Status.CompletionTime } return WorkerStatus{ CompletionTime: cpTime, @@ -459,19 +470,24 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ } defer conn.Close() c := katibapi.NewManagerClient(conn) + wkind, err := getWorkerKind(instance.Spec.WorkerSpec) + if err != nil { + log.Printf("getWorkerKind error %v", err) + return false, err + } for i, t := range instance.Status.Trials { for j, w := range t.WorkerList { if w.Condition == katibv1alpha1.ConditionCompleted || w.Condition == katibv1alpha1.ConditionFailed { if w.ObjectiveValue == nil && w.Condition == katibv1alpha1.ConditionCompleted { cwids = append(cwids, w.WorkerID) } - if err := r.deleteWorkerResources(instance, ns, &w); err != nil { + if err := r.deleteWorkerResources(instance, ns, w.WorkerID, wkind); err != nil { return false, err } continue } nextSuggestionSchedule = false - js := r.getJobWorkerStatus(&w, ns) + js := r.getJobWorkerStatus(ns, w.WorkerID, wkind) update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j) } } @@ -545,7 +561,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha return true, err } for _, t := range trials { - wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind, false) + wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind.Kind, false) if err != nil { log.Printf("Spawn worker error %v", err) instance.Status.Condition = katibv1alpha1.ConditionFailed @@ -558,7 +574,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha WorkerList: []katibv1alpha1.WorkerCondition{ katibv1alpha1.WorkerCondition{ WorkerID: wid, - Kind: wkind, + Kind: wkind.Kind, Condition: katibv1alpha1.ConditionCreated, StartTime: metav1.Now(), }, @@ -596,12 +612,12 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ return "", err } BUFSIZE := 1024 - job := createWorkerJobObj(wkind) + job := &unstructured.Unstructured{} if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil { log.Printf("Yaml decode error %v", err) return "", err } - if err := controllerutil.SetControllerReference(instance, job.(metav1.Object), r.scheme); err != nil { + if err := controllerutil.SetControllerReference(instance, job, r.scheme); err != nil { log.Printf("SetControllerReference error %v", err) return "", err } @@ -621,7 +637,7 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp log.Printf("getWorkerKind error %v", err) return err } - mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs) + mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind.Kind, namespace, mcs) if err != nil { log.Printf("getMetricsCollectorManifest error %v", err) return err diff --git a/pkg/controller/studyjob/utils.go b/pkg/controller/studyjob/utils.go index b0fae6e6d9c..339e04db383 100644 --- a/pkg/controller/studyjob/utils.go +++ b/pkg/controller/studyjob/utils.go @@ -17,29 +17,14 @@ import ( katibapi "github.com/kubeflow/katib/pkg/api" katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1" - pytorchjobv1beta1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1" - tfjobv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta1" - batchv1 "k8s.io/api/batch/v1" batchv1beta "k8s.io/api/batch/v1beta1" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" k8syaml "k8s.io/apimachinery/pkg/util/yaml" ) -func createWorkerJobObj(kind string) runtime.Object { - switch kind { - case DefaultJobWorker: - return &batchv1.Job{} - case TFJobWorker: - return &tfjobv1beta1.TFJob{} - case PyTorchJobWorker: - return &pytorchjobv1beta1.PyTorchJob{} - } - return nil -} - func validateWorkerResource(wkind string) error { for _, crd := range invalidCRDResources { if crd == wkind { @@ -62,7 +47,7 @@ func isFatalWatchError(err error, job string) bool { } } -func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) { +func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (*schema.GroupVersionKind, error) { var typeChecker interface{} BUFSIZE := 1024 _, m, err := getWorkerManifest( @@ -78,30 +63,39 @@ func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) { true, ) if err != nil { - return "", err + return nil, err } if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil { log.Printf("Yaml decode validation error %v", err) - return "", err + return nil, err } tcMap, ok := typeChecker.(map[string]interface{}) if !ok { - return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) + return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker) } wkind, ok := tcMap["kind"] if !ok { - return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) + return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker) } wkindS, ok := wkind.(string) if !ok { - return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) + return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker) + } + apiVersion, ok := tcMap["apiVersion"] + if !ok { + return nil, fmt.Errorf("Cannot get apiVersion of worker %v", typeChecker) + } + apiVersionS, ok := apiVersion.(string) + if !ok { + return nil, fmt.Errorf("Cannot get apiVersion of worker %v", typeChecker) } for _, kind := range ValidWorkerKindList { if kind == wkindS { - return wkindS, validateWorkerResource(kind) + workerGVK := schema.FromAPIVersionAndKind(apiVersionS, kind) + return &workerGVK, validateWorkerResource(kind) } } - return "", fmt.Errorf("Invalid kind of worker %v", typeChecker) + return nil, fmt.Errorf("Invalid kind of worker %v", typeChecker) } func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error { @@ -125,7 +119,7 @@ func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error { ParameterSet: []*katibapi.Parameter{}, }, instance.Spec.WorkerSpec, - wkind, + wkind.Kind, namespace, true, ) @@ -133,19 +127,18 @@ func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error { return err } - job := createWorkerJobObj(wkind) + job := &unstructured.Unstructured{} if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil { log.Printf("Yaml decode error %v", err) return err } - metav1Job := job.(metav1.Object) - if metav1Job.GetNamespace() != namespace || metav1Job.GetName() != workerID { + if job.GetNamespace() != namespace || job.GetName() != workerID { return fmt.Errorf("Invalid worker template.") } var mcjob batchv1beta.CronJob - mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, instance.Spec.MetricsCollectorSpec) + mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind.Kind, namespace, instance.Spec.MetricsCollectorSpec) if err != nil { log.Printf("getMetricsCollectorManifest error %v", err) return err