Skip to content

Commit

Permalink
ignore tfjob/pytorch job if corresponding CRD not created (#335)
Browse files Browse the repository at this point in the history
* ignore tfjob/pytorch job if corresponding CRD not created

* update log message

* only ignore NoMatchError when watch CRD

* refactor func name for watch error
  • Loading branch information
hougangliu authored and k8s-ci-robot committed Jan 25, 2019
1 parent c67892f commit f4026e4
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 46 deletions.
44 changes: 0 additions & 44 deletions pkg/controller/studyjob/manifest_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"fmt"
"log"
"text/template"

katibapi "github.com/kubeflow/katib/pkg/api"
Expand All @@ -27,51 +26,8 @@ import (
"github.com/kubeflow/katib/pkg/manager/studyjobclient"

"k8s.io/apimachinery/pkg/util/uuid"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
)

func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) {
var typeChecker interface{}
BUFSIZE := 1024
_, m, err := getWorkerManifest(
nil,
"validation",
&katibapi.Trial{
TrialId: "validation",
ParameterSet: []*katibapi.Parameter{},
},
workerSpec,
"",
"",
true,
)
if err != nil {
return "", err
}
if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil {
log.Printf("Yaml decode validation error %v", err)
return "", err
}
tcMap, ok := typeChecker.(map[string]interface{})
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkind, ok := tcMap["kind"]
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkindS, ok := wkind.(string)
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
for _, kind := range ValidWorkerKindList {
if kind == wkindS {
return wkindS, nil
}
}
return "", fmt.Errorf("Invalid kind of worker %v", typeChecker)
}

func getWorkerManifest(c katibapi.ManagerClient, studyID string, trial *katibapi.Trial, workerSpec *katibv1alpha1.WorkerSpec, kind string, ns string, dryrun bool) (string, *bytes.Buffer, error) {
var wtp *template.Template = nil
var err error
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/studyjob/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
cleanDataFinalizer = "clean-studyjob-data"
)

var (
invalidCRDResources [] string
)

/**
* USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller
* business logic. Delete these comments after modifying this file.*
Expand Down Expand Up @@ -116,7 +120,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
IsController: true,
OwnerType: &katibv1alpha1.StudyJob{},
})
if err != nil {
if isFatalWatchError(err, TFJobWorker) {
return err
}

Expand All @@ -126,7 +130,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
IsController: true,
OwnerType: &katibv1alpha1.StudyJob{},
})
if err != nil {
if isFatalWatchError(err, PyTorchJobWorker) {
return err
}

Expand Down
65 changes: 65 additions & 0 deletions pkg/controller/studyjob/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

batchv1 "k8s.io/api/batch/v1"
batchv1beta "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -39,6 +40,70 @@ func createWorkerJobObj(kind string) runtime.Object {
return nil
}

func validateWorkerResource(wkind string) error {
for _, crd := range invalidCRDResources {
if crd == wkind {
return fmt.Errorf("Cannot support %s; Please install the CRD and restart studyjob-controller", wkind)
}
}
return nil
}

func isFatalWatchError(err error, job string) bool {
if err == nil {
return false
}
if meta.IsNoMatchError(err) {
invalidCRDResources = append(invalidCRDResources, job)
log.Printf("Fail to watch CRD: %v; Please install the CRD and restart studyjob-controller to support %s worker", err, job)
return false
} else {
return true
}
}

func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) {
var typeChecker interface{}
BUFSIZE := 1024
_, m, err := getWorkerManifest(
nil,
"validation",
&katibapi.Trial{
TrialId: "validation",
ParameterSet: []*katibapi.Parameter{},
},
workerSpec,
"",
"",
true,
)
if err != nil {
return "", err
}
if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil {
log.Printf("Yaml decode validation error %v", err)
return "", err
}
tcMap, ok := typeChecker.(map[string]interface{})
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkind, ok := tcMap["kind"]
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkindS, ok := wkind.(string)
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
for _, kind := range ValidWorkerKindList {
if kind == wkindS {
return wkindS, validateWorkerResource(kind)
}
}
return "", fmt.Errorf("Invalid kind of worker %v", typeChecker)
}

func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error {
if instance.Spec.SuggestionSpec == nil {
return fmt.Errorf("No Spec.SuggestionSpec specified.")
Expand Down

0 comments on commit f4026e4

Please sign in to comment.