Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing Operator specific handling during a StudyJob run #387

Merged
merged 2 commits into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 55 additions & 39 deletions pkg/controller/studyjob/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
batchv1beta "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -120,7 +122,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
if isFatalWatchError(err, TFJobWorker) {
return err
}

err = c.Watch(
&source.Kind{Type: &pytorchjobv1beta1.PyTorchJob{}},
&handler.EnqueueRequestForOwner{
Expand Down Expand Up @@ -296,15 +297,15 @@ func (r *ReconcileStudyJobController) checkGoal(instance *katibv1alpha1.StudyJob
return goal, nil
}

func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, w *katibv1alpha1.WorkerCondition) error {
wid := w.WorkerID
obj := createWorkerJobObj(w.Kind)
func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, wid string, wkind *schema.GroupVersionKind) error {
nname := types.NamespacedName{Namespace: ns, Name: wid}
var wretain, mcretain bool = false, false
if instance.Spec.WorkerSpec != nil {
wretain = instance.Spec.WorkerSpec.Retain
}
if !wretain {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(*wkind)
joberr := r.Client.Get(context.TODO(), nname, obj)
if joberr == nil {
if err := r.Delete(context.TODO(), obj, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
Expand Down Expand Up @@ -393,46 +394,56 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins
return update, nil
}

func (r *ReconcileStudyJobController) getJobWorkerStatus(w *katibv1alpha1.WorkerCondition, ns string) WorkerStatus {
runtimejob := createWorkerJobObj(w.Kind)
nname := types.NamespacedName{Namespace: ns, Name: w.WorkerID}
joberr := r.Client.Get(context.TODO(), nname, runtimejob)
if joberr != nil {
return WorkerStatus{}
}
func (r *ReconcileStudyJobController) getJobWorkerStatus(ns string, wid string, wkind *schema.GroupVersionKind) WorkerStatus {
nname := types.NamespacedName{Namespace: ns, Name: wid}
var state katibapi.State = katibapi.State_RUNNING
var cpTime *metav1.Time
switch w.Kind {
switch wkind.Kind {

case DefaultJobWorker:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this case still needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But the string values are different currently. 'Complete' in https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/api/batch/v1/types.go#L170 vs 'Succeeded' in https://github.com/kubeflow/tf-operator/blob/master/pkg/apis/common/v1beta2/common_types.go#L121

In general, since 'job' is a built-in K8s Kind, should we care about it matching with our JobCondition? I feel that the only necessary condition is that all custom job operators share the same JobStatus type.

Or other option is, we can change the common JobConditionType string to match the batch Job's JobConditionType. But new developers might not be aware of this assumption and may break the logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok for this version. One thing that we have discussed in the past is moving the common types into its own repo. If we go down that route, it will be easier to change then. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. There is a discussion in kubeflow/mpi-operator#92 (comment)

job := runtimejob.(*batchv1.Job)
var job batchv1.Job
if err := r.Client.Get(context.TODO(), nname, &job); err != nil {
log.Printf("Client Get error %v for %v", err, nname)
return WorkerStatus{}
}
if job.Status.Active == 0 && job.Status.Succeeded > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should be able to use the similar logic here to check for job completion: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/api/batch/v1/generated.proto#L158

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see above comment

state = katibapi.State_COMPLETED
} else if job.Status.Failed > 0 {
state = katibapi.State_ERROR
}
cpTime = job.Status.CompletionTime
case TFJobWorker:
job := runtimejob.(*tfjobv1beta1.TFJob)
if len(job.Status.Conditions) > 0 {
lc := job.Status.Conditions[len(job.Status.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
}

default:
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(*wkind)
if err := r.Client.Get(context.TODO(), nname, u); err != nil {
log.Printf("Client Get error %v for %v", err, nname)
return WorkerStatus{}
}
cpTime = job.Status.CompletionTime
case PyTorchJobWorker:
job := runtimejob.(*pytorchjobv1beta1.PyTorchJob)
if len(job.Status.Conditions) > 0 {
lc := job.Status.Conditions[len(job.Status.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
status, ok, unerr := unstructured.NestedFieldCopy(u.Object, "status")

if ok {
statusMap := status.(map[string]interface{})
jobStatus := commonv1beta1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Printf("Error in converting unstructured to status: %v ", err)
return WorkerStatus{}
}
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
}
}
cpTime = jobStatus.CompletionTime

} else if unerr != nil {
log.Printf("Error in getting Job Status from unstructured: %v", unerr)
return WorkerStatus{}
}
cpTime = job.Status.CompletionTime
}
return WorkerStatus{
CompletionTime: cpTime,
Expand All @@ -459,19 +470,24 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
}
defer conn.Close()
c := katibapi.NewManagerClient(conn)
wkind, err := getWorkerKind(instance.Spec.WorkerSpec)
if err != nil {
log.Printf("getWorkerKind error %v", err)
return false, err
}
for i, t := range instance.Status.Trials {
for j, w := range t.WorkerList {
if w.Condition == katibv1alpha1.ConditionCompleted || w.Condition == katibv1alpha1.ConditionFailed {
if w.ObjectiveValue == nil && w.Condition == katibv1alpha1.ConditionCompleted {
cwids = append(cwids, w.WorkerID)
}
if err := r.deleteWorkerResources(instance, ns, &w); err != nil {
if err := r.deleteWorkerResources(instance, ns, w.WorkerID, wkind); err != nil {
return false, err
}
continue
}
nextSuggestionSchedule = false
js := r.getJobWorkerStatus(&w, ns)
js := r.getJobWorkerStatus(ns, w.WorkerID, wkind)
update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j)
}
}
Expand Down Expand Up @@ -545,7 +561,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
return true, err
}
for _, t := range trials {
wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind, false)
wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind.Kind, false)
if err != nil {
log.Printf("Spawn worker error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
Expand All @@ -558,7 +574,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
WorkerList: []katibv1alpha1.WorkerCondition{
katibv1alpha1.WorkerCondition{
WorkerID: wid,
Kind: wkind,
Kind: wkind.Kind,
Condition: katibv1alpha1.ConditionCreated,
StartTime: metav1.Now(),
},
Expand Down Expand Up @@ -596,12 +612,12 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ
return "", err
}
BUFSIZE := 1024
job := createWorkerJobObj(wkind)
job := &unstructured.Unstructured{}
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil {
log.Printf("Yaml decode error %v", err)
return "", err
}
if err := controllerutil.SetControllerReference(instance, job.(metav1.Object), r.scheme); err != nil {
if err := controllerutil.SetControllerReference(instance, job, r.scheme); err != nil {
log.Printf("SetControllerReference error %v", err)
return "", err
}
Expand All @@ -621,7 +637,7 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp
log.Printf("getWorkerKind error %v", err)
return err
}
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs)
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind.Kind, namespace, mcs)
if err != nil {
log.Printf("getMetricsCollectorManifest error %v", err)
return err
Expand Down
53 changes: 23 additions & 30 deletions pkg/controller/studyjob/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,14 @@ import (

katibapi "github.com/kubeflow/katib/pkg/api"
katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1"
pytorchjobv1beta1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1"
tfjobv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta1"

batchv1 "k8s.io/api/batch/v1"
batchv1beta "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
)

func createWorkerJobObj(kind string) runtime.Object {
switch kind {
case DefaultJobWorker:
return &batchv1.Job{}
case TFJobWorker:
return &tfjobv1beta1.TFJob{}
case PyTorchJobWorker:
return &pytorchjobv1beta1.PyTorchJob{}
}
return nil
}

func validateWorkerResource(wkind string) error {
for _, crd := range invalidCRDResources {
if crd == wkind {
Expand All @@ -62,7 +47,7 @@ func isFatalWatchError(err error, job string) bool {
}
}

func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) {
func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (*schema.GroupVersionKind, error) {
var typeChecker interface{}
BUFSIZE := 1024
_, m, err := getWorkerManifest(
Expand All @@ -78,30 +63,39 @@ func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) {
true,
)
if err != nil {
return "", err
return nil, err
}
if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil {
log.Printf("Yaml decode validation error %v", err)
return "", err
return nil, err
}
tcMap, ok := typeChecker.(map[string]interface{})
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkind, ok := tcMap["kind"]
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkindS, ok := wkind.(string)
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
apiVersion, ok := tcMap["apiVersion"]
if !ok {
return nil, fmt.Errorf("Cannot get apiVersion of worker %v", typeChecker)
}
apiVersionS, ok := apiVersion.(string)
if !ok {
return nil, fmt.Errorf("Cannot get apiVersion of worker %v", typeChecker)
}
for _, kind := range ValidWorkerKindList {
if kind == wkindS {
return wkindS, validateWorkerResource(kind)
workerGVK := schema.FromAPIVersionAndKind(apiVersionS, kind)
return &workerGVK, validateWorkerResource(kind)
}
}
return "", fmt.Errorf("Invalid kind of worker %v", typeChecker)
return nil, fmt.Errorf("Invalid kind of worker %v", typeChecker)
}

func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error {
Expand All @@ -125,27 +119,26 @@ func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error {
ParameterSet: []*katibapi.Parameter{},
},
instance.Spec.WorkerSpec,
wkind,
wkind.Kind,
namespace,
true,
)
if err != nil {
return err
}

job := createWorkerJobObj(wkind)
job := &unstructured.Unstructured{}
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil {
log.Printf("Yaml decode error %v", err)
return err
}

metav1Job := job.(metav1.Object)
if metav1Job.GetNamespace() != namespace || metav1Job.GetName() != workerID {
if job.GetNamespace() != namespace || job.GetName() != workerID {
return fmt.Errorf("Invalid worker template.")
}

var mcjob batchv1beta.CronJob
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, instance.Spec.MetricsCollectorSpec)
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind.Kind, namespace, instance.Spec.MetricsCollectorSpec)
if err != nil {
log.Printf("getMetricsCollectorManifest error %v", err)
return err
Expand Down