diff --git a/examples/v1beta1/bayesianoptimization-example.yaml b/examples/v1beta1/bayesianoptimization-example.yaml index ae9faf5bc9a..7d7b604ad56 100644 --- a/examples/v1beta1/bayesianoptimization-example.yaml +++ b/examples/v1beta1/bayesianoptimization-example.yaml @@ -39,6 +39,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/cmaes-example.yaml b/examples/v1beta1/cmaes-example.yaml index 9f1ad36df5a..e0b54aa6ff5 100644 --- a/examples/v1beta1/cmaes-example.yaml +++ b/examples/v1beta1/cmaes-example.yaml @@ -36,6 +36,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/custom-metricscollector-example.yaml b/examples/v1beta1/custom-metricscollector-example.yaml index 53cddc217c7..c56456a2051 100644 --- a/examples/v1beta1/custom-metricscollector-example.yaml +++ b/examples/v1beta1/custom-metricscollector-example.yaml @@ -52,6 +52,7 @@ spec: min: "0.3" max: "0.7" trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/file-metricscollector-example.yaml b/examples/v1beta1/file-metricscollector-example.yaml index 4b2d701c008..c9c43fd435b 100644 --- a/examples/v1beta1/file-metricscollector-example.yaml +++ b/examples/v1beta1/file-metricscollector-example.yaml @@ -39,6 +39,7 @@ spec: min: "0.3" max: "0.7" trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/fpga/xgboost-example.yaml b/examples/v1beta1/fpga/xgboost-example.yaml index 365606fea36..f5bc5e1a95a 100644 --- a/examples/v1beta1/fpga/xgboost-example.yaml +++ b/examples/v1beta1/fpga/xgboost-example.yaml @@ -35,6 +35,7 @@ spec: name: subsample parameterType: double trialTemplate: + primaryContainerName: training-container trialParameters: - name: alpha description: L1 regularization term on weights diff --git a/examples/v1beta1/grid-example.yaml b/examples/v1beta1/grid-example.yaml index 07e7f94f94a..5a53e731937 100644 --- a/examples/v1beta1/grid-example.yaml +++ b/examples/v1beta1/grid-example.yaml @@ -37,6 +37,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/hyperband-example.yaml b/examples/v1beta1/hyperband-example.yaml index 1316cd27248..dc79d1fb0db 100644 --- a/examples/v1beta1/hyperband-example.yaml +++ b/examples/v1beta1/hyperband-example.yaml @@ -46,6 +46,7 @@ spec: min: "20" max: "20" trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/metric-strategy-example.yaml b/examples/v1beta1/metric-strategy-example.yaml index 9fd746d6b8d..3174437f54b 100644 --- a/examples/v1beta1/metric-strategy-example.yaml +++ b/examples/v1beta1/metric-strategy-example.yaml @@ -41,6 +41,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/nas/darts-example-cpu.yaml b/examples/v1beta1/nas/darts-example-cpu.yaml index e43d27f48ae..fd295ae6fac 100644 --- a/examples/v1beta1/nas/darts-example-cpu.yaml +++ b/examples/v1beta1/nas/darts-example-cpu.yaml @@ -40,6 +40,7 @@ spec: list: - "3" trialTemplate: + primaryContainerName: training-container trialParameters: - name: algorithmSettings description: Algorithm settings of DARTS Experiment diff --git a/examples/v1beta1/nas/darts-example-gpu.yaml b/examples/v1beta1/nas/darts-example-gpu.yaml index d5a72347af5..dbf9da54190 100644 --- a/examples/v1beta1/nas/darts-example-gpu.yaml +++ b/examples/v1beta1/nas/darts-example-gpu.yaml @@ -57,6 +57,7 @@ spec: - "3" - operationType: skip_connection trialTemplate: + primaryContainerName: training-container trialParameters: - name: algorithmSettings description: Algorithm settings of DARTS Experiment diff --git a/examples/v1beta1/nas/enas-example-cpu.yaml b/examples/v1beta1/nas/enas-example-cpu.yaml index b979deb948e..46941f0f9f3 100644 --- a/examples/v1beta1/nas/enas-example-cpu.yaml +++ b/examples/v1beta1/nas/enas-example-cpu.yaml @@ -123,6 +123,7 @@ spec: max: "3" step: "1" trialTemplate: + primaryContainerName: training-container trialParameters: - name: neuralNetworkArchitecture description: NN architecture contains operations ID on each NN layer and skip connections between layers diff --git a/examples/v1beta1/nas/enas-example-gpu.yaml b/examples/v1beta1/nas/enas-example-gpu.yaml index 938c263e844..0e23ea34fe0 100644 --- a/examples/v1beta1/nas/enas-example-gpu.yaml +++ b/examples/v1beta1/nas/enas-example-gpu.yaml @@ -120,6 +120,7 @@ spec: max: "3" step: "1" trialTemplate: + primaryContainerName: training-container trialParameters: - name: neuralNetworkArchitecture description: NN architecture contains operations ID on each NN layer and skip connections between layers diff --git a/examples/v1beta1/pytorchjob-example.yaml b/examples/v1beta1/pytorchjob-example.yaml index 0bf9bcebbff..3376d861b73 100644 --- a/examples/v1beta1/pytorchjob-example.yaml +++ b/examples/v1beta1/pytorchjob-example.yaml @@ -25,6 +25,7 @@ spec: min: "0.5" max: "0.9" trialTemplate: + primaryContainerName: pytorch trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/random-example.yaml b/examples/v1beta1/random-example.yaml index 0a763d5a34c..33231ea925a 100644 --- a/examples/v1beta1/random-example.yaml +++ b/examples/v1beta1/random-example.yaml @@ -36,6 +36,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/resume-experiment/from-volume-resume.yaml b/examples/v1beta1/resume-experiment/from-volume-resume.yaml index 44d83410e17..3afc4c5c16d 100644 --- a/examples/v1beta1/resume-experiment/from-volume-resume.yaml +++ b/examples/v1beta1/resume-experiment/from-volume-resume.yaml @@ -37,6 +37,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/resume-experiment/never-resume.yaml b/examples/v1beta1/resume-experiment/never-resume.yaml index 5452c829664..bd4edf6a5ce 100644 --- a/examples/v1beta1/resume-experiment/never-resume.yaml +++ b/examples/v1beta1/resume-experiment/never-resume.yaml @@ -37,6 +37,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/tfjob-example.yaml b/examples/v1beta1/tfjob-example.yaml index 153cf50839c..5ec0090214a 100644 --- a/examples/v1beta1/tfjob-example.yaml +++ b/examples/v1beta1/tfjob-example.yaml @@ -32,6 +32,7 @@ spec: min: "100" max: "200" trialTemplate: + primaryContainerName: tensorflow trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/tpe-example.yaml b/examples/v1beta1/tpe-example.yaml index 63291cb1a2e..456f3b0f8e1 100644 --- a/examples/v1beta1/tpe-example.yaml +++ b/examples/v1beta1/tpe-example.yaml @@ -36,6 +36,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/examples/v1beta1/trial-metadata-substitution.yaml b/examples/v1beta1/trial-metadata-substitution.yaml index a5e4206af5d..c165e4229cd 100644 --- a/examples/v1beta1/trial-metadata-substitution.yaml +++ b/examples/v1beta1/trial-metadata-substitution.yaml @@ -23,6 +23,7 @@ spec: min: "0.01" max: "0.03" trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/manifests/v1beta1/katib-controller/katib-controller.yaml b/manifests/v1beta1/katib-controller/katib-controller.yaml index 130d185fc7b..4dfb01fd445 100644 --- a/manifests/v1beta1/katib-controller/katib-controller.yaml +++ b/manifests/v1beta1/katib-controller/katib-controller.yaml @@ -25,6 +25,9 @@ spec: command: ["./katib-controller"] args: - "--webhook-port=8443" + - "--trial-resources=Job.v1.batch" + - "--trial-resources=TFJob.v1.kubeflow.org" + - "--trial-resources=PyTorchJob.v1.kubeflow.org" - "--trial-resources=MPIJob.v1.kubeflow.org" ports: - containerPort: 8443 diff --git a/pkg/apis/controller/experiments/v1beta1/constants.go b/pkg/apis/controller/experiments/v1beta1/constants.go index bb94e788d49..073660db2bb 100644 --- a/pkg/apis/controller/experiments/v1beta1/constants.go +++ b/pkg/apis/controller/experiments/v1beta1/constants.go @@ -16,15 +16,32 @@ limitations under the License. package v1beta1 const ( - // Default value of Spec.ParallelTrialCount + // DefaultTrialParallelCount is the default value of spec.parallelTrialCount. DefaultTrialParallelCount = 3 - // Default value of Spec.ConfigMapName for Trial template + // DefaultTrialConfigMapName is the default value of spec.trialTemplate.configMapName for Trial template. DefaultTrialConfigMapName = "trial-template" - // Default value of Spec.TemplatePath + // DefaultTrialTemplatePath is the default value of spec.trialTemplate.TemplatePath. DefaultTrialTemplatePath = "defaultTrialTemplate.yaml" - // Default value of Spec.DefaultResumePolicy + // DefaultResumePolicy is the default value of spec.resumePolicy. DefaultResumePolicy = LongRunning + + // DefaultJobSuccessCondition is the default value of spec.trialTemplate.successCondition for Job. + DefaultJobSuccessCondition = "status.conditions.#(type==\"Complete\")#|#(status==\"True\")#" + + // DefaultJobFailureCondition is the default value of spec.trialTemplate.failureCondition for Job. + DefaultJobFailureCondition = "status.conditions.#(type==\"Failed\")#|#(status==\"True\")#" + + // DefaultKubeflowJobSuccessCondition is the default value of spec.trialTemplate.successCondition for Kubeflow Job. + DefaultKubeflowJobSuccessCondition = "status.conditions.#(type==\"Succeeded\")#|#(status==\"True\")#" + + // DefaultKubeflowJobFailureCondition is the default value of spec.trialTemplate.failureCondition for Kubeflow Job. + DefaultKubeflowJobFailureCondition = "status.conditions.#(type==\"Failed\")#|#(status==\"True\")#" +) + +var ( + // DefaultKubeflowJobPrimaryPodLabels is the default value of spec.trialTemplate.primaryPodLabels for Kubeflow Job. + DefaultKubeflowJobPrimaryPodLabels = map[string]string{"job-role": "master"} ) diff --git a/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go b/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go index 77ba9f8a124..cf08d0e2c0e 100644 --- a/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go +++ b/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go @@ -127,6 +127,30 @@ func (e *Experiment) setDefaultTrialTemplate() { }, } } + + // Set default values for Job, TFJob and PyTorchJob if TrialSpec is not nil + if t.TrialSource.TrialSpec != nil { + jobKind := t.TrialSource.TrialSpec.GetKind() + if jobKind == consts.JobKindJob { + if t.SuccessCondition == "" { + t.SuccessCondition = DefaultJobSuccessCondition + } + if t.FailureCondition == "" { + t.FailureCondition = DefaultJobFailureCondition + } + } else if jobKind == consts.JobKindTF || jobKind == consts.JobKindPyTorch { + if t.SuccessCondition == "" { + t.SuccessCondition = DefaultKubeflowJobSuccessCondition + } + if t.FailureCondition == "" { + t.FailureCondition = DefaultKubeflowJobFailureCondition + } + // For Kubeflow Job also set default PrimaryPodLabels + if len(t.PrimaryPodLabels) == 0 { + t.PrimaryPodLabels = DefaultKubeflowJobPrimaryPodLabels + } + } + } e.Spec.TrialTemplate = t } diff --git a/pkg/apis/controller/experiments/v1beta1/experiment_types.go b/pkg/apis/controller/experiments/v1beta1/experiment_types.go index 0721af52466..dc945f9838b 100644 --- a/pkg/apis/controller/experiments/v1beta1/experiment_types.go +++ b/pkg/apis/controller/experiments/v1beta1/experiment_types.go @@ -208,7 +208,8 @@ type TrialTemplate struct { // List of parameters that are used in trial template TrialParameters []TrialParameterSpec `json:"trialParameters,omitempty"` - // Labels that determines if pod needs to be injected by Katib sidecar container + // Labels that determines if pod needs to be injected by Katib sidecar container. + // If PrimaryPodLabels is omitted, metrics collector wraps all Trial's pods. PrimaryPodLabels map[string]string `json:"primaryPodLabels,omitempty"` // Name of training container where actual model training is running diff --git a/pkg/controller.v1beta1/consts/const.go b/pkg/controller.v1beta1/consts/const.go index 598ce9d8326..cf01c923606 100644 --- a/pkg/controller.v1beta1/consts/const.go +++ b/pkg/controller.v1beta1/consts/const.go @@ -110,11 +110,6 @@ const ( // JobKindPyTorch is the kind of PyTorchJob. JobKindPyTorch = "PyTorchJob" - // built-in JobRoles - JobRole = "job-role" - JobRoleTF = "tf-job-role" - JobRolePyTorch = "pytorch-job-role" - // AnnotationIstioSidecarInjectName is the annotation of Istio Sidecar AnnotationIstioSidecarInjectName = "sidecar.istio.io/inject" diff --git a/pkg/controller.v1beta1/experiment/experiment_consts.go b/pkg/controller.v1beta1/experiment/experiment_consts.go deleted file mode 100644 index 306f54aea3e..00000000000 --- a/pkg/controller.v1beta1/experiment/experiment_consts.go +++ /dev/null @@ -1,5 +0,0 @@ -package experiment - -const ( - ReconcileFailedReason = "ReconcileFailed" -) diff --git a/pkg/controller.v1beta1/experiment/experiment_controller.go b/pkg/controller.v1beta1/experiment/experiment_controller.go index 865bc6a3764..bd13907210e 100644 --- a/pkg/controller.v1beta1/experiment/experiment_controller.go +++ b/pkg/controller.v1beta1/experiment/experiment_controller.go @@ -240,7 +240,7 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re if err != nil { logger.Error(err, "Reconcile experiment error") r.recorder.Eventf(instance, - corev1.EventTypeWarning, ReconcileFailedReason, + corev1.EventTypeWarning, consts.ReconcileErrorReason, "Failed to reconcile: %v", err) return reconcile.Result{}, err } diff --git a/pkg/controller.v1beta1/experiment/experiment_status.go b/pkg/controller.v1beta1/experiment/experiment_controller_status.go similarity index 100% rename from pkg/controller.v1beta1/experiment/experiment_status.go rename to pkg/controller.v1beta1/experiment/experiment_controller_status.go diff --git a/pkg/controller.v1beta1/experiment/experiment_controller_test.go b/pkg/controller.v1beta1/experiment/experiment_controller_test.go index 28de09e43da..670c44e8bb3 100644 --- a/pkg/controller.v1beta1/experiment/experiment_controller_test.go +++ b/pkg/controller.v1beta1/experiment/experiment_controller_test.go @@ -33,9 +33,10 @@ import ( ) const ( - experimentName = "test-experiment" - trialName = "test-trial" - namespace = "default" + experimentName = "test-experiment" + trialName = "test-trial" + namespace = "default" + primaryContainer = "tensorflow" timeout = time.Second * 40 ) @@ -372,7 +373,7 @@ func newFakeInstance() *experimentsv1beta1.Experiment { Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "tensorflow", + Name: primaryContainer, Image: "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0", Command: []string{ "python", @@ -415,6 +416,10 @@ func newFakeInstance() *experimentsv1beta1.Experiment { }, ResumePolicy: experimentsv1beta1.NeverResume, TrialTemplate: &experimentsv1beta1.TrialTemplate{ + PrimaryPodLabels: experimentsv1beta1.DefaultKubeflowJobPrimaryPodLabels, + PrimaryContainerName: primaryContainer, + SuccessCondition: experimentsv1beta1.DefaultKubeflowJobSuccessCondition, + FailureCondition: experimentsv1beta1.DefaultKubeflowJobFailureCondition, TrialParameters: []experimentsv1beta1.TrialParameterSpec{ { Name: "learningRate", @@ -509,7 +514,7 @@ func newFakeTFJob() *tfv1.TFJob { Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "tensorflow", + Name: primaryContainer, Image: "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0", Command: []string{ "python", diff --git a/pkg/controller.v1beta1/experiment/experiment_util.go b/pkg/controller.v1beta1/experiment/experiment_controller_util.go similarity index 100% rename from pkg/controller.v1beta1/experiment/experiment_util.go rename to pkg/controller.v1beta1/experiment/experiment_controller_util.go diff --git a/pkg/controller.v1beta1/experiment/manifest/generator_test.go b/pkg/controller.v1beta1/experiment/manifest/generator_test.go index 7f6a0686ed9..960ab2deff3 100644 --- a/pkg/controller.v1beta1/experiment/manifest/generator_test.go +++ b/pkg/controller.v1beta1/experiment/manifest/generator_test.go @@ -2,20 +2,21 @@ package manifest import ( "errors" - "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "math" "reflect" "testing" "github.com/golang/mock/gomock" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + commonapiv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" + "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/kubeflow/katib/pkg/controller.v1beta1/util" katibclientmock "github.com/kubeflow/katib/pkg/mock/v1beta1/util/katibclient" - batchv1 "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) func TestGetRunSpecWithHP(t *testing.T) { diff --git a/pkg/controller.v1beta1/experiment/util/status_util.go b/pkg/controller.v1beta1/experiment/util/status_util.go index 24a1ef1d2e8..0677ac924cd 100644 --- a/pkg/controller.v1beta1/experiment/util/status_util.go +++ b/pkg/controller.v1beta1/experiment/util/status_util.go @@ -38,7 +38,6 @@ const ( ExperimentMaxTrialsReachedReason = "ExperimentMaxTrialsReached" ExperimentSuggestionEndReachedReason = "ExperimentSuggestionEndReached" ExperimentFailedReason = "ExperimentFailed" - ExperimentKilledReason = "ExperimentKilled" ) func UpdateExperimentStatus(collector *ExperimentsCollector, instance *experimentsv1beta1.Experiment, trials *trialsv1beta1.TrialList) error { diff --git a/pkg/controller.v1beta1/suggestion/suggestion_controller_status.go b/pkg/controller.v1beta1/suggestion/suggestion_controller_status.go index 3c47f157d53..1af2c177ea7 100644 --- a/pkg/controller.v1beta1/suggestion/suggestion_controller_status.go +++ b/pkg/controller.v1beta1/suggestion/suggestion_controller_status.go @@ -12,9 +12,7 @@ const ( SuggestionDeploymentReady = "DeploymentReady" SuggestionDeploymentNotReady = "DeploymentNotReady" SuggestionRunningReason = "SuggestionRunning" - SuggestionSucceededReason = "SuggestionSucceeded" SuggestionFailedReason = "SuggestionFailed" - SuggestionKilledReason = "SuggestionKilled" ) func (r *ReconcileSuggestion) updateStatus(s *suggestionsv1beta1.Suggestion, oldS *suggestionsv1beta1.Suggestion) error { diff --git a/pkg/controller.v1beta1/suggestion/suggestion_controller_test.go b/pkg/controller.v1beta1/suggestion/suggestion_controller_test.go index df8438c32e8..e99649966b4 100644 --- a/pkg/controller.v1beta1/suggestion/suggestion_controller_test.go +++ b/pkg/controller.v1beta1/suggestion/suggestion_controller_test.go @@ -22,7 +22,6 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/onsi/gomega" "golang.org/x/net/context" appsv1 "k8s.io/api/apps/v1" diff --git a/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go b/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go index d46d6ea86fa..1e9fbf87d74 100644 --- a/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go +++ b/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go @@ -8,6 +8,13 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" commonapiv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" @@ -15,17 +22,8 @@ import ( suggestionsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1" trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" suggestionapi "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" - suggestionapimock "github.com/kubeflow/katib/pkg/mock/v1beta1/api" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" - "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + suggestionapimock "github.com/kubeflow/katib/pkg/mock/v1beta1/api" ) type k8sMatcher struct { diff --git a/pkg/controller.v1beta1/trial/trial_controller.go b/pkg/controller.v1beta1/trial/trial_controller.go index c2a64770b8d..aeb324c8434 100644 --- a/pkg/controller.v1beta1/trial/trial_controller.go +++ b/pkg/controller.v1beta1/trial/trial_controller.go @@ -21,7 +21,6 @@ import ( "fmt" "time" - batchv1beta "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -47,7 +46,6 @@ import ( "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/managerclient" trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util" "github.com/kubeflow/katib/pkg/controller.v1beta1/util" - jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1" "github.com/spf13/viper" ) @@ -90,46 +88,13 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } - // Watch for changes to Trial + // Watch for changes in Trial err = c.Watch(&source.Kind{Type: &trialsv1beta1.Trial{}}, &handler.EnqueueRequestForObject{}) if err != nil { log.Error(err, "Trial watch error") return err } - // Watch for changes to Cronjob - err = c.Watch( - &source.Kind{Type: &batchv1beta.CronJob{}}, - &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &trialsv1beta1.Trial{}, - }) - - if err != nil { - log.Error(err, "CronJob watch error") - return err - } - - for _, gvk := range jobv1beta1.SupportedJobList { - unstructuredJob := &unstructured.Unstructured{} - unstructuredJob.SetGroupVersionKind(gvk) - err = c.Watch( - &source.Kind{Type: unstructuredJob}, - &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &trialsv1beta1.Trial{}, - }) - if err != nil { - if meta.IsNoMatchError(err) { - log.Info("Job watch error. CRD might be missing. Please install CRD and restart katib-controller", "CRD Kind", gvk.Kind) - continue - } - return err - } else { - log.Info("Job watch added successfully", "CRD Kind", gvk.Kind) - } - } - trialResources := viper.Get(consts.ConfigTrialResources) if trialResources != nil { // Cast interface to gvk slice object @@ -157,7 +122,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { "CRD Group", gvk.Group, "CRD Version", gvk.Version, "CRD Kind", gvk.Kind) } } - log.Info("Trial controller created") return nil } @@ -223,7 +187,7 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, } logger.Error(err, "Reconcile trial error") r.recorder.Eventf(instance, - corev1.EventTypeWarning, ReconcileFailedReason, + corev1.EventTypeWarning, consts.ReconcileErrorReason, "Failed to reconcile: %v", err) return reconcile.Result{}, err } @@ -261,65 +225,36 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error { // Job already exists // TODO Can desired Spec differ from deployedSpec? - if deployedJob != nil { - if instance.Spec.SuccessCondition != "" && instance.Spec.FailureCondition != "" && !instance.IsCompleted() { - jobStatus, err := trialutil.GetDeployedJobStatus(instance, deployedJob) - if err != nil { - logger.Error(err, "GetDeployedJobStatus error") - } - // Not needed to update status if jobStatus is nil - if jobStatus == nil { - return nil - } - // If Job is succeeded update Trial observation - if jobStatus.Condition == trialutil.JobSucceeded { - if err = r.UpdateTrialStatusObservation(instance); err != nil { - logger.Error(err, "Update trial status observation error") - return err - } - } - // If observation is empty metrics collector doesn't finish - if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil { - logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued") - return errMetricsNotReported - } + if deployedJob != nil && !instance.IsCompleted() { + jobStatus, err := trialutil.GetDeployedJobStatus(instance, deployedJob) + if err != nil { + logger.Error(err, "GetDeployedJobStatus error") + } - // Update Trial job status only - // if job has succeeded and if observation field is available. - // if job has failed - // This will ensure that trial is set to be complete only if metric is collected at least once - r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus) + // Not needed to update status if jobStatus is nil + if jobStatus == nil { + return nil + } - } else if instance.Spec.SuccessCondition == "" && instance.Spec.FailureCondition == "" { - // TODO (andreyvelich): This can be deleted after switch to custom CRD - kind := deployedJob.GetKind() - jobProvider, err := jobv1beta1.New(kind) - if err != nil { - logger.Error(err, "Failed to create the provider") + // If Job is succeeded update Trial observation + if jobStatus.Condition == trialutil.JobSucceeded { + if err = r.UpdateTrialStatusObservation(instance); err != nil { + logger.Error(err, "Update trial status observation error") return err } - // Currently jobCondition - part of commonv1 TF package for all jobs - jobCondition, err := jobProvider.GetDeployedJobStatus(deployedJob) - if err != nil { - logger.Error(err, "Get deployed status error") - return err - } - - // Update trial observation when the job is succeeded. - if isJobSucceeded(jobCondition) { - if err = r.UpdateTrialStatusObservation(instance); err != nil { - logger.Error(err, "Update trial status observation error") - return err - } - } + } - // Update Trial job status only - // if job has succeeded and if observation field is available. - // if job has failed - // This will ensure that trial is set to be complete only if metric is collected at least once - r.UpdateTrialStatusConditionDeprecated(instance, deployedJob, jobCondition) + // If observation is empty metrics collector doesn't finish + if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil { + logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued") + return errMetricsNotReported } + // Update Trial job status only + // if job has succeeded and if observation field is available. + // if job has failed + // This will ensure that trial is set to be complete only if metric is collected at least once + r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus) } return nil } @@ -349,7 +284,7 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob } // TODO (andreyvelich): Mutate job needs to be refactored (ref: https://github.com/kubeflow/katib/issues/1320) - // Currently, commented since we don't do Mutate Job for SupportedJobList + // Currently, commented since we don't implement it // jobProvider, err := jobv1beta1.New(desiredJob.GetKind()) // if err != nil { // return nil, err @@ -367,6 +302,10 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob logger.Error(err, "Create job error") return nil, err } + // We should assign desiredJob to deployedJob to update Trial status + // properly on the first reconcile run. + deployedJob = desiredJob + eventMsg := fmt.Sprintf("Job %s has been created", desiredJob.GetName()) r.recorder.Eventf(instance, corev1.EventTypeNormal, JobCreatedReason, eventMsg) } else { diff --git a/pkg/controller.v1beta1/trial/trial_controller_consts.go b/pkg/controller.v1beta1/trial/trial_controller_consts.go deleted file mode 100644 index cc0fe390046..00000000000 --- a/pkg/controller.v1beta1/trial/trial_controller_consts.go +++ /dev/null @@ -1,22 +0,0 @@ -package trial - -const ( - DefaultJobKind = "Job" - - // For trials - TrialCreatedReason = "TrialCreated" - TrialRunningReason = "TrialRunning" - TrialSucceededReason = "TrialSucceeded" - TrialMetricsUnavailableReason = "MetricsUnavailable" - TrialFailedReason = "TrialFailed" - TrialKilledReason = "TrialKilled" - - // For Jobs - JobCreatedReason = "JobCreated" - JobDeletedReason = "JobDeleted" - JobSucceededReason = "JobSucceeded" - JobMetricsUnavailableReason = "MetricsUnavailable" - JobFailedReason = "JobFailed" - JobRunningReason = "JobRunning" - ReconcileFailedReason = "ReconcileFailed" -) diff --git a/pkg/controller.v1beta1/trial/trial_controller_status.go b/pkg/controller.v1beta1/trial/trial_controller_status.go index e163093a72f..9f4670ba32c 100644 --- a/pkg/controller.v1beta1/trial/trial_controller_status.go +++ b/pkg/controller.v1beta1/trial/trial_controller_status.go @@ -6,6 +6,23 @@ import ( trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" ) +const ( + // For Trials + TrialCreatedReason = "TrialCreated" + TrialRunningReason = "TrialRunning" + TrialSucceededReason = "TrialSucceeded" + TrialMetricsUnavailableReason = "MetricsUnavailable" + TrialFailedReason = "TrialFailed" + + // For Jobs + JobCreatedReason = "JobCreated" + JobDeletedReason = "JobDeleted" + JobSucceededReason = "JobSucceeded" + JobMetricsUnavailableReason = "MetricsUnavailable" + JobFailedReason = "JobFailed" + JobRunningReason = "JobRunning" +) + type updateStatusFunc func(instance *trialsv1beta1.Trial) error func (r *ReconcileTrial) updateStatus(instance *trialsv1beta1.Trial) error { diff --git a/pkg/controller.v1beta1/trial/trial_controller_test.go b/pkg/controller.v1beta1/trial/trial_controller_test.go index a46e066d101..f191d602cf3 100644 --- a/pkg/controller.v1beta1/trial/trial_controller_test.go +++ b/pkg/controller.v1beta1/trial/trial_controller_test.go @@ -1,19 +1,20 @@ package trial import ( + "fmt" "testing" "time" "github.com/golang/mock/gomock" + kubeflowcommonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1" "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus" "github.com/spf13/viper" "golang.org/x/net/context" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" - - batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -21,22 +22,22 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" + experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" api_pb "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util" util "github.com/kubeflow/katib/pkg/controller.v1beta1/util" managerclientmock "github.com/kubeflow/katib/pkg/mock/v1beta1/trial/managerclient" - kubeflowcommonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" ) const ( - namespace = "default" - trialName = "test-trial" - tfJobName = "test-tfjob" - batchJobName = "test-job" - - timeout = time.Second * 40 + namespace = "default" + trialName = "test-trial" + tfJobName = "test-tfjob" + batchJobName = "test-job" + objectiveMetric = "accuracy" + timeout = time.Second * 40 ) var trialKey = types.NamespacedName{Name: trialName, Namespace: namespace} @@ -52,7 +53,8 @@ func TestAdd(t *testing.T) { mgr, err := manager.New(cfg, manager.Options{}) g.Expect(err).NotTo(gomega.HaveOccurred()) - // Set fake trial resources + // Set Trial resources. + // TFJob controller is installed, MPIJob controller is missed. trialResources := trialutil.GvkListFlag{ { Group: "kubeflow.org", @@ -106,6 +108,18 @@ func TestReconcileTFJob(t *testing.T) { } recFn := SetupTestReconcile(r) + + // Set TFJob resource + trialResources := trialutil.GvkListFlag{ + { + Group: "kubeflow.org", + Version: "v1", + Kind: "TFJob", + }, + } + + viper.Set(consts.ConfigTrialResources, trialResources) + g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred()) stopMgr, mgrStopped := StartTestManager(mgr, g) @@ -115,10 +129,19 @@ func TestReconcileTFJob(t *testing.T) { mgrStopped.Wait() }() - // Empty result for GetTrialObservationLog + // Empty result for GetTrialObservationLog. + // If objective metrics are not parsed, metrics collector reports "unavailable" value to DB. observationLog := &api_pb.GetObservationLogReply{ ObservationLog: &api_pb.ObservationLog{ - MetricLogs: []*api_pb.MetricLog{}, + MetricLogs: []*api_pb.MetricLog{ + { + Metric: &api_pb.Metric{ + Name: objectiveMetric, + Value: consts.UnavailableMetricValue, + }, + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + }, + }, }, } @@ -126,7 +149,7 @@ func TestReconcileTFJob(t *testing.T) { mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil).AnyTimes() // Test - Regural Trial run with TFJob - trial := newFakeTrial(newFakeTFJob()) + trial := newFakeTrialTFJob() tfJob := &tfv1.TFJob{} // Create the Trial @@ -146,6 +169,8 @@ func TestReconcileTFJob(t *testing.T) { // Manually update TFJob status to succeeded // Expect that Trial succeeded status is false with metrics unavailable reason // Metrics unavailable because GetTrialObservationLog returns nil + SucceededReason := "TFJob succeeded test reason" + SucceededMessage := "TFJob succeeded test message" g.Eventually(func() bool { c.Get(context.TODO(), tfJobKey, tfJob) tfJob.Status = kubeflowcommonv1.JobStatus{ @@ -153,8 +178,8 @@ func TestReconcileTFJob(t *testing.T) { { Type: kubeflowcommonv1.JobSucceeded, Status: corev1.ConditionTrue, - Message: "TFJob succeeded test message", - Reason: "TFJob succeeded test reason", + Message: SucceededMessage, + Reason: SucceededReason, }, }, } @@ -164,7 +189,9 @@ func TestReconcileTFJob(t *testing.T) { c.Get(context.TODO(), trialKey, trial) isConditionCorrect := false for _, cond := range trial.Status.Conditions { - if cond.Type == trialsv1beta1.TrialSucceeded && cond.Status == corev1.ConditionFalse && cond.Reason == TrialMetricsUnavailableReason { + if cond.Type == trialsv1beta1.TrialSucceeded && cond.Status == corev1.ConditionFalse && + cond.Reason == fmt.Sprintf("%v. Job reason: %v", TrialMetricsUnavailableReason, SucceededReason) && + cond.Message == fmt.Sprintf("Metrics are not available. Job message: %v", SucceededMessage) { isConditionCorrect = true } } @@ -215,6 +242,16 @@ func TestReconcileBatchJob(t *testing.T) { } recFn := SetupTestReconcile(r) + // Set Job resource + trialResources := trialutil.GvkListFlag{ + { + Group: "batch", + Version: "v1", + Kind: "Job", + }, + } + + viper.Set(consts.ConfigTrialResources, trialResources) g.Expect(add(mgr, recFn)).NotTo(gomega.HaveOccurred()) stopMgr, mgrStopped := StartTestManager(mgr, g) @@ -231,14 +268,14 @@ func TestReconcileBatchJob(t *testing.T) { { TimeStamp: "2020-08-10T14:47:38+08:00", Metric: &api_pb.Metric{ - Name: "accuracy", + Name: objectiveMetric, Value: "0.99", }, }, { TimeStamp: "2020-08-10T14:50:38+08:00", Metric: &api_pb.Metric{ - Name: "accuracy", + Name: objectiveMetric, Value: "0.11", }, }, @@ -250,7 +287,7 @@ func TestReconcileBatchJob(t *testing.T) { mockManagerClient.EXPECT().DeleteTrialObservationLog(gomock.Any()).Return(nil, nil).AnyTimes() // Test 1 - Regural Trial run with BatchJob - trial := newFakeTrial(newFakeBatchJob()) + trial := newFakeTrialBatchJob() batchJob := &batchv1.Job{} // Create the Trial @@ -311,11 +348,11 @@ func TestReconcileBatchJob(t *testing.T) { g.Expect(c.Status().Update(context.TODO(), batchJob)).NotTo(gomega.HaveOccurred()) // Create the Trial - trial = newFakeTrial(newFakeBatchJob()) + trial = newFakeTrialBatchJob() g.Expect(c.Create(context.TODO(), trial)).NotTo(gomega.HaveOccurred()) // Expect that Trial status is succeeded and metrics are properly populated - // Metrics available because GetTrialObservationLog returns something + // Metrics available because GetTrialObservationLog returns values g.Eventually(func() bool { c.Get(context.TODO(), trialKey, trial) return trial.IsSucceeded() && @@ -349,13 +386,13 @@ func TestGetObjectiveMetricValue(t *testing.T) { {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: "error", Value: "0.06"}}, {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: "error", Value: "0.07"}}, {TimeStamp: "2020-04-12T14:47:42+08:00", Metric: &api_pb.Metric{Name: "error", Value: "0.1"}}, - {TimeStamp: "2020-04-13T14:47:38+08:00", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.7"}}, - {TimeStamp: "2020-04-13T14:47:39+08:00", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.71"}}, - {TimeStamp: "2020-04-13T14:47:40+08:00", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.72"}}, - {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.68"}}, - {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.69"}}, - {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.67"}}, - {TimeStamp: "2020-04-12T14:47:42+08:00", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.6"}}, + {TimeStamp: "2020-04-13T14:47:38+08:00", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.7"}}, + {TimeStamp: "2020-04-13T14:47:39+08:00", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.71"}}, + {TimeStamp: "2020-04-13T14:47:40+08:00", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.72"}}, + {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.68"}}, + {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.69"}}, + {TimeStamp: "2020-04-13T14:47:41+08:00", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.67"}}, + {TimeStamp: "2020-04-12T14:47:42+08:00", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.6"}}, } getMetricsFromLogs := func(strategies []commonv1beta1.MetricStrategy) (*commonv1beta1.Metric, *commonv1beta1.Metric, error) { @@ -367,7 +404,7 @@ func TestGetObjectiveMetricValue(t *testing.T) { for index, metric := range observation.Metrics { if metric.Name == "error" { errMetric = &observation.Metrics[index] - } else if metric.Name == "accuracy" { + } else if metric.Name == objectiveMetric { accMetric = &observation.Metrics[index] } } @@ -376,7 +413,7 @@ func TestGetObjectiveMetricValue(t *testing.T) { metricStrategies := []commonv1beta1.MetricStrategy{ {Name: "error", Value: commonv1beta1.ExtractByMin}, - {Name: "accuracy", Value: commonv1beta1.ExtractByMax}, + {Name: objectiveMetric, Value: commonv1beta1.ExtractByMax}, } errMetric, accMetric, err := getMetricsFromLogs(metricStrategies) g.Expect(err).ShouldNot(gomega.HaveOccurred()) @@ -391,14 +428,16 @@ func TestGetObjectiveMetricValue(t *testing.T) { // Add one other metric to test correct parsing {TimeStamp: "2020-08-10T14:47:42+08:00", Metric: &api_pb.Metric{Name: "not-accuracy", Value: "1.15"}}, // Add metric with invalid timestamp - {TimeStamp: "2020-08-10T14:47:42", Metric: &api_pb.Metric{Name: "accuracy", Value: "0.77"}}, + {TimeStamp: "2020-08-10T14:47:42", Metric: &api_pb.Metric{Name: objectiveMetric, Value: "0.77"}}, } _, err = getMetrics(invalidLogs, metricStrategies) g.Expect(err).To(gomega.HaveOccurred()) } -func newFakeTFJob() *tfv1.TFJob { - return &tfv1.TFJob{ +func newFakeTrialTFJob() *trialsv1beta1.Trial { + primaryContainer := "tensorflow" + + tfJob := &tfv1.TFJob{ TypeMeta: metav1.TypeMeta{ APIVersion: "kubeflow.org/v1", Kind: "TFJob", @@ -416,7 +455,7 @@ func newFakeTFJob() *tfv1.TFJob { Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "tensorflow", + Name: primaryContainer, Image: "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0", Command: []string{ "python", @@ -437,7 +476,7 @@ func newFakeTFJob() *tfv1.TFJob { Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "tensorflow", + Name: primaryContainer, Image: "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0", Command: []string{ "python", @@ -454,10 +493,36 @@ func newFakeTFJob() *tfv1.TFJob { }, }, } + runSpec, _ := util.ConvertObjectToUnstructured(tfJob) + + return &trialsv1beta1.Trial{ + ObjectMeta: metav1.ObjectMeta{ + Name: trialName, + Namespace: namespace, + }, + Spec: trialsv1beta1.TrialSpec{ + PrimaryPodLabels: experimentsv1beta1.DefaultKubeflowJobPrimaryPodLabels, + PrimaryContainerName: primaryContainer, + SuccessCondition: experimentsv1beta1.DefaultKubeflowJobSuccessCondition, + FailureCondition: experimentsv1beta1.DefaultKubeflowJobFailureCondition, + Objective: &commonv1beta1.ObjectiveSpec{ + ObjectiveMetricName: objectiveMetric, + MetricStrategies: []commonv1beta1.MetricStrategy{ + { + Name: objectiveMetric, + Value: commonv1beta1.ExtractByMax, + }, + }, + }, + RunSpec: runSpec, + }, + } } -func newFakeBatchJob() *batchv1.Job { - return &batchv1.Job{ +func newFakeTrialBatchJob() *trialsv1beta1.Trial { + primaryContainer := "training-container" + + job := &batchv1.Job{ TypeMeta: metav1.TypeMeta{ APIVersion: "batch/v1", Kind: "Job", @@ -471,7 +536,7 @@ func newFakeBatchJob() *batchv1.Job { Spec: corev1.PodSpec{ Containers: []corev1.Container{ { - Name: "training-container", + Name: primaryContainer, Image: "docker.io/kubeflowkatib/mxnet-mnist", Command: []string{ "python3", @@ -486,23 +551,22 @@ func newFakeBatchJob() *batchv1.Job { }, }, } -} - -func newFakeTrial(runObject interface{}) *trialsv1beta1.Trial { - - runSpec, _ := util.ConvertObjectToUnstructured(runObject) + runSpec, _ := util.ConvertObjectToUnstructured(job) - t := &trialsv1beta1.Trial{ + return &trialsv1beta1.Trial{ ObjectMeta: metav1.ObjectMeta{ Name: trialName, Namespace: namespace, }, Spec: trialsv1beta1.TrialSpec{ + PrimaryContainerName: primaryContainer, + SuccessCondition: experimentsv1beta1.DefaultJobSuccessCondition, + FailureCondition: experimentsv1beta1.DefaultJobFailureCondition, Objective: &commonv1beta1.ObjectiveSpec{ - ObjectiveMetricName: "accuracy", + ObjectiveMetricName: objectiveMetric, MetricStrategies: []commonv1beta1.MetricStrategy{ { - Name: "accuracy", + Name: objectiveMetric, Value: commonv1beta1.ExtractByMax, }, }, @@ -510,5 +574,4 @@ func newFakeTrial(runObject interface{}) *trialsv1beta1.Trial { RunSpec: runSpec, }, } - return t } diff --git a/pkg/controller.v1beta1/trial/trial_controller_util.go b/pkg/controller.v1beta1/trial/trial_controller_util.go index 08f1a3344ea..9d520366342 100644 --- a/pkg/controller.v1beta1/trial/trial_controller_util.go +++ b/pkg/controller.v1beta1/trial/trial_controller_util.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -32,7 +31,6 @@ import ( api_pb "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util" - commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" ) const ( @@ -121,53 +119,6 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria return } -// TODO (andreyvelich): Can be deleted after custom CRD is implemented -func (r *ReconcileTrial) UpdateTrialStatusConditionDeprecated(instance *trialsv1beta1.Trial, deployedJob *unstructured.Unstructured, jobCondition *commonv1.JobCondition) { - if jobCondition == nil || instance == nil || deployedJob == nil { - return - } - now := metav1.Now() - jobConditionType := (*jobCondition).Type - if jobConditionType == commonv1.JobSucceeded { - if isTrialObservationAvailable(instance) { - msg := "Trial has succeeded" - instance.MarkTrialStatusSucceeded(corev1.ConditionTrue, TrialSucceededReason, msg) - instance.Status.CompletionTime = &now - - eventMsg := fmt.Sprintf("Job %s has succeeded", deployedJob.GetName()) - r.recorder.Eventf(instance, corev1.EventTypeNormal, JobSucceededReason, eventMsg) - r.collector.IncreaseTrialsSucceededCount(instance.Namespace) - } else { - // TODO (andreyvelich): Is is correct to mark succeeded status false when metrics are unavailable? - msg := "Metrics are not available" - instance.MarkTrialStatusSucceeded(corev1.ConditionFalse, TrialMetricsUnavailableReason, msg) - - eventMsg := fmt.Sprintf("Metrics are not available for Job %s", deployedJob.GetName()) - r.recorder.Eventf(instance, corev1.EventTypeWarning, JobMetricsUnavailableReason, eventMsg) - } - } else if jobConditionType == commonv1.JobFailed { - msg := "Trial has failed" - instance.MarkTrialStatusFailed(TrialFailedReason, msg) - instance.Status.CompletionTime = &now - - jobConditionMessage := (*jobCondition).Message - eventMsg := fmt.Sprintf("Job %s has failed: %s", deployedJob.GetName(), jobConditionMessage) - r.recorder.Eventf(instance, corev1.EventTypeNormal, JobFailedReason, eventMsg) - r.collector.IncreaseTrialsFailedCount(instance.Namespace) - } else if jobConditionType == commonv1.JobRunning { - msg := "Trial is running" - instance.MarkTrialStatusRunning(TrialRunningReason, msg) - jobConditionMessage := (*jobCondition).Message - eventMsg := fmt.Sprintf("Job %s is running: %s", - deployedJob.GetName(), jobConditionMessage) - r.recorder.Eventf(instance, corev1.EventTypeNormal, - JobRunningReason, eventMsg) - // TODO(gaocegege): Should we maintain a TrialsRunningCount? - } - // else nothing to do - return -} - func (r *ReconcileTrial) UpdateTrialStatusObservation(instance *trialsv1beta1.Trial) error { reply, err := r.GetTrialObservationLog(instance) if err != nil { @@ -221,18 +172,6 @@ func isTrialObservationAvailable(instance *trialsv1beta1.Trial) bool { return false } -func isJobSucceeded(jobCondition *commonv1.JobCondition) bool { - if jobCondition == nil { - return false - } - jobConditionType := (*jobCondition).Type - if jobConditionType == commonv1.JobSucceeded { - return true - } - - return false -} - func getMetrics(metricLogs []*api_pb.MetricLog, strategies []commonv1beta1.MetricStrategy) (*commonv1beta1.Observation, error) { metrics := make(map[string]*commonv1beta1.Metric) timestamps := make(map[string]*time.Time) diff --git a/pkg/job/v1beta1/job.go b/pkg/job/v1beta1/job.go deleted file mode 100644 index ac8caf009f5..00000000000 --- a/pkg/job/v1beta1/job.go +++ /dev/null @@ -1,88 +0,0 @@ -package v1beta1 - -import ( - commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - - "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" - "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" -) - -var ( - jobLogger = logf.Log.WithName("provider-job") -) - -// Job is the provider of Job kind. -type Job struct{} - -// GetDeployedJobStatus get the deployed job status. -func (j Job) GetDeployedJobStatus( - deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) { - jobCondition := commonv1.JobCondition{} - // Set default type to running. - jobCondition.Type = commonv1.JobRunning - status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status") - if !ok { - if unerr != nil { - jobLogger.Error(unerr, "NestedFieldCopy unstructured to status error") - return nil, unerr - } - // Job does not have the running condition in status, thus we think - // the job is running when it is created. - jobLogger.Info("NestedFieldCopy", "Info", "Job doesn't have status yet") - return nil, nil - } - - statusMap := status.(map[string]interface{}) - jobStatus := batchv1.JobStatus{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) - if err != nil { - jobLogger.Error(err, "Convert unstructured to status error") - return nil, err - } - for _, cond := range jobStatus.Conditions { - if cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue { - jobCondition.Type = commonv1.JobSucceeded - // JobConditions message not populated when succeeded for batchv1 Job - break - } - if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue { - jobCondition.Type = commonv1.JobFailed - jobCondition.Message = cond.Message - break - } - } - return &jobCondition, nil -} - -// IsTrainingContainer returns if the c is the actual training container. -func (j Job) IsTrainingContainer(index int, c corev1.Container) bool { - if index == 0 { - // for Job worker, the first container will be taken as worker container, - // katib document should note it - return true - } - return false -} -func (j Job) MutateJob(*v1beta1.Trial, *unstructured.Unstructured) error { - return nil -} - -func (j *Job) Create(kind string) Provider { - return &Job{} -} - -func init() { - ProviderRegistry[consts.JobKindJob] = &Job{} - SupportedJobList[consts.JobKindJob] = schema.GroupVersionKind{ - Group: "batch", - Version: "v1", - Kind: consts.JobKindJob, - } - JobRoleMap[consts.JobKindJob] = []string{} -} diff --git a/pkg/job/v1beta1/kubeflow.go b/pkg/job/v1beta1/kubeflow.go deleted file mode 100644 index 26835e29715..00000000000 --- a/pkg/job/v1beta1/kubeflow.go +++ /dev/null @@ -1,104 +0,0 @@ -package v1beta1 - -import ( - pytorchv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1" - commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" - tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - - "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" - "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" -) - -var ( - kfLogger = logf.Log.WithName("provider-kubeflow") -) - -// Kubeflow is the provider of Kubeflow kinds. -type Kubeflow struct { - Kind string -} - -// GetDeployedJobStatus get the deployed job status. -func (k Kubeflow) GetDeployedJobStatus( - deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) { - jobCondition := commonv1.JobCondition{} - // Set default type to running. - jobCondition.Type = commonv1.JobRunning - status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status") - if !ok { - if unerr != nil { - kfLogger.Error(unerr, "NestedFieldCopy unstructured to status error") - return nil, unerr - } - kfLogger.Info("NestedFieldCopy unstructured to status error", - "Info", "Kubeflow Job doesn't have status yet") - return nil, nil - } - - statusMap := status.(map[string]interface{}) - jobStatus := commonv1.JobStatus{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) - if err != nil { - kfLogger.Error(err, "Convert unstructured to status error") - return nil, err - } - // Get the latest condition and set it to jobCondition. - // TODO (andreyvelich): We should also check condition status - if len(jobStatus.Conditions) > 0 { - lc := jobStatus.Conditions[len(jobStatus.Conditions)-1] - jobCondition.Type = lc.Type - jobCondition.Message = lc.Message - jobCondition.Status = lc.Status - jobCondition.Reason = lc.Reason - } - - return &jobCondition, nil -} - -// IsTrainingContainer returns if the c is the actual training container. -func (k Kubeflow) IsTrainingContainer(index int, c corev1.Container) bool { - switch k.Kind { - case consts.JobKindTF: - if c.Name == tfv1.DefaultContainerName { - return true - } - case consts.JobKindPyTorch: - if c.Name == pytorchv1.DefaultContainerName { - return true - } - default: - kfLogger.Info("Invalid Katib worker kind", "JobKind", k.Kind) - return false - } - return false -} - -func (k Kubeflow) MutateJob(*v1beta1.Trial, *unstructured.Unstructured) error { - return nil -} - -func (k *Kubeflow) Create(kind string) Provider { - return &Kubeflow{Kind: kind} -} - -func init() { - ProviderRegistry[consts.JobKindTF] = &Kubeflow{} - SupportedJobList[consts.JobKindTF] = schema.GroupVersionKind{ - Group: "kubeflow.org", - Version: "v1", - Kind: consts.JobKindTF, - } - JobRoleMap[consts.JobKindTF] = []string{consts.JobRole, consts.JobRoleTF} - ProviderRegistry[consts.JobKindPyTorch] = &Kubeflow{} - SupportedJobList[consts.JobKindPyTorch] = schema.GroupVersionKind{ - Group: "kubeflow.org", - Version: "v1", - Kind: consts.JobKindPyTorch, - } - JobRoleMap[consts.JobKindPyTorch] = []string{consts.JobRole, consts.JobRolePyTorch} -} diff --git a/pkg/job/v1beta1/provider.go b/pkg/job/v1beta1/provider.go deleted file mode 100644 index 6417a677cba..00000000000 --- a/pkg/job/v1beta1/provider.go +++ /dev/null @@ -1,44 +0,0 @@ -package v1beta1 - -import ( - "fmt" - - commonv1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - - "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" -) - -var ( - ProviderRegistry = make(map[string]Provider) - // JobRoleMap is the map which is used to determin if the replica is master. - // Katib will inject metrics collector into master replica. - JobRoleMap = make(map[string][]string) - // SupportedJobList returns the list of the supported jobs' GVK. - SupportedJobList = make(map[string]schema.GroupVersionKind) -) - -// Provider provides utilities for different jobs. -type Provider interface { - // GetDeployedJobStatus get the deployed job status. - GetDeployedJobStatus( - deployedJob *unstructured.Unstructured) (*commonv1.JobCondition, error) - // IsTrainingContainer returns if the c is the actual training container. - IsTrainingContainer(index int, c corev1.Container) bool - // Mutate jobSpec before creation if necessary - MutateJob(*v1beta1.Trial, *unstructured.Unstructured) error - // Recreate Provider from kind - Create(kind string) Provider -} - -// New creates a new Provider. -func New(kind string) (Provider, error) { - if ptr, ok := ProviderRegistry[kind]; ok { - return ptr.Create(kind), nil - } else { - return nil, fmt.Errorf( - "failed to create the provider: Unknown kind %s", kind) - } -} diff --git a/pkg/ui/v1beta1/backend.go b/pkg/ui/v1beta1/backend.go index 56dbbe74135..19d86b1bd0e 100644 --- a/pkg/ui/v1beta1/backend.go +++ b/pkg/ui/v1beta1/backend.go @@ -12,7 +12,6 @@ import ( experimentv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" api_pb_v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" - "github.com/kubeflow/katib/pkg/util/v1beta1/katibclient" ) diff --git a/pkg/webhook/v1beta1/experiment/validator/validator.go b/pkg/webhook/v1beta1/experiment/validator/validator.go index f62ce6e7063..75a80eb268b 100644 --- a/pkg/webhook/v1beta1/experiment/validator/validator.go +++ b/pkg/webhook/v1beta1/experiment/validator/validator.go @@ -24,7 +24,6 @@ import ( "github.com/kubeflow/katib/pkg/controller.v1beta1/experiment/manifest" experimentutil "github.com/kubeflow/katib/pkg/controller.v1beta1/experiment/util" util "github.com/kubeflow/katib/pkg/controller.v1beta1/util" - jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1" mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common" ) @@ -201,6 +200,16 @@ func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1beta1.Ex trialTemplate := instance.Spec.TrialTemplate + // Check if PrimaryContainerName is set + if trialTemplate.PrimaryContainerName == "" { + return fmt.Errorf("spec.trialTemplate.primaryContainerName must be specified") + } + + // Check if SuccessCondition and FailureCondition is set + if trialTemplate.SuccessCondition == "" || trialTemplate.FailureCondition == "" { + return fmt.Errorf("spec.trialTemplate.successCondition and spec.trialTemplate.failureCondition must be specified") + } + // Check if trialParameters exists if trialTemplate.TrialParameters == nil { return fmt.Errorf("spec.trialTemplate.trialParameters must be specified") @@ -282,60 +291,57 @@ func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1beta1.Ex return fmt.Errorf("APIVersion and Kind in spec.trialTemplate must be specified") } - // Check if Job is supported // Check if Job can be converted to Batch Job/TFJob/PyTorchJob - // Other jobs are not validated - if err := g.validateSupportedJob(runSpec); err != nil { + // Other CRDs are not validated + if err := g.validateTrialJob(runSpec); err != nil { return fmt.Errorf("Invalid spec.trialTemplate: %v", err) } return nil } -func (g *DefaultValidator) validateSupportedJob(runSpec *unstructured.Unstructured) error { +func (g *DefaultValidator) validateTrialJob(runSpec *unstructured.Unstructured) error { gvk := runSpec.GroupVersionKind() - supportedJobs := jobv1beta1.SupportedJobList - for _, sJob := range supportedJobs { - if gvk == sJob { - switch gvk.Kind { - case consts.JobKindJob: - batchJob := batchv1.Job{} - - // Validate that RunSpec can be converted to Batch Job - err := runtime.DefaultUnstructuredConverter.FromUnstructured(runSpec.Object, &batchJob) - if err != nil { - return fmt.Errorf("Unable to convert spec.TrialTemplate: %v to %v: %v", runSpec.Object, gvk.Kind, err) - } - err = validatePatchJob(runSpec, batchJob, gvk.Kind) - if err != nil { - return err - } - case consts.JobKindTF: - tfJob := &tfv1.TFJob{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(runSpec.Object, &tfJob) - if err != nil { - return fmt.Errorf("Unable to convert spec.TrialTemplate to %v: %v", gvk.Kind, err) - } - err = validatePatchJob(runSpec, tfJob, gvk.Kind) - if err != nil { - return err - } - case consts.JobKindPyTorch: - pytorchJob := &pytorchv1.PyTorchJob{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(runSpec.Object, &pytorchJob) - if err != nil { - return fmt.Errorf("Unable to convert spec.TrialTemplate to %v: %v", gvk.Kind, err) - } - err = validatePatchJob(runSpec, pytorchJob, gvk.Kind) - if err != nil { - return err - } + // Validate only Job, TFJob and PyTorchJob + switch gvk.Kind { + case consts.JobKindJob: + batchJob := batchv1.Job{} - } - return nil + // Validate that RunSpec can be converted to Batch Job + err := runtime.DefaultUnstructuredConverter.FromUnstructured(runSpec.Object, &batchJob) + if err != nil { + return fmt.Errorf("Unable to convert spec.TrialTemplate: %v to %v: %v", runSpec.Object, gvk.Kind, err) } + + // Try to patch runSpec to Batch Job + err = validatePatchJob(runSpec, batchJob, gvk.Kind) + if err != nil { + return err + } + case consts.JobKindTF: + tfJob := &tfv1.TFJob{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(runSpec.Object, &tfJob) + if err != nil { + return fmt.Errorf("Unable to convert spec.TrialTemplate: %v to %v: %v", runSpec.Object, gvk.Kind, err) + } + err = validatePatchJob(runSpec, tfJob, gvk.Kind) + if err != nil { + return err + } + case consts.JobKindPyTorch: + pyTorchJob := &pytorchv1.PyTorchJob{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(runSpec.Object, &pyTorchJob) + if err != nil { + return fmt.Errorf("Unable to convert spec.TrialTemplate: %v to %v: %v", runSpec.Object, gvk.Kind, err) + } + err = validatePatchJob(runSpec, pyTorchJob, gvk.Kind) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/webhook/v1beta1/experiment/validator/validator_test.go b/pkg/webhook/v1beta1/experiment/validator/validator_test.go index 9269ec49bec..8aa473984a7 100644 --- a/pkg/webhook/v1beta1/experiment/validator/validator_test.go +++ b/pkg/webhook/v1beta1/experiment/validator/validator_test.go @@ -560,7 +560,28 @@ spec: Err: false, testDescription: "Trial template has custom Kind", }, + // Trial Template doesn't have PrimaryContainerName + { + Instance: func() *experimentsv1beta1.Experiment { + i := newFakeInstance() + i.Spec.TrialTemplate.PrimaryContainerName = "" + return i + }(), + Err: true, + testDescription: "Trial template doesn't have PrimaryContainerName", + }, + // Trial Template doesn't have SuccessCondition + { + Instance: func() *experimentsv1beta1.Experiment { + i := newFakeInstance() + i.Spec.TrialTemplate.SuccessCondition = "" + return i + }(), + Err: true, + testDescription: "Trial template doesn't have SuccessCondition", + }, } + for _, tc := range tcs { err := g.(*DefaultValidator).validateTrialTemplate(tc.Instance) if !tc.Err && err != nil { @@ -571,7 +592,7 @@ spec: } } -func TestValidateSupportedJob(t *testing.T) { +func TestValidateTrialJob(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -729,7 +750,7 @@ spec: } for _, tc := range tcs { - err := g.(*DefaultValidator).validateSupportedJob(tc.RunSpec) + err := g.(*DefaultValidator).validateTrialJob(tc.RunSpec) if !tc.Err && err != nil { t.Errorf("Case: %v failed. Expected nil, got %v", tc.testDescription, err) } else if tc.Err && err == nil { @@ -1117,6 +1138,9 @@ func newFakeTrialTemplate(trialJob interface{}, trialParameters []experimentsv1b } return &experimentsv1beta1.TrialTemplate{ + PrimaryContainerName: "training-container", + SuccessCondition: experimentsv1beta1.DefaultKubeflowJobSuccessCondition, + FailureCondition: experimentsv1beta1.DefaultKubeflowJobFailureCondition, TrialSource: experimentsv1beta1.TrialSource{ TrialSpec: trialSpec, }, diff --git a/pkg/webhook/v1beta1/pod/inject_webhook.go b/pkg/webhook/v1beta1/pod/inject_webhook.go index f2519322131..2b072e0bd72 100644 --- a/pkg/webhook/v1beta1/pod/inject_webhook.go +++ b/pkg/webhook/v1beta1/pod/inject_webhook.go @@ -111,14 +111,14 @@ func (s *sidecarInjector) MutationRequired(pod *v1.Pod, ns string) (bool, error) return false, err } - // Try to get Katib job kind and job name from mutating pod - jobKind, jobName, err := s.getKatibJob(object, ns) + // Try to get Katib Job name from mutating pod + _, jobName, err := s.getKatibJob(object, ns) if err != nil { return false, nil } trial := &trialsv1beta1.Trial{} - // jobName and Trial name is equal + // Job name and Trial name is equal if err := s.client.Get(context.TODO(), apitypes.NamespacedName{Name: jobName, Namespace: ns}, trial); err != nil { return false, err } @@ -129,11 +129,6 @@ func (s *sidecarInjector) MutationRequired(pod *v1.Pod, ns string) (bool, error) if !isPrimaryPod(pod.Labels, trial.Spec.PrimaryPodLabels) { return false, nil } - } else { - // TODO (andreyvelich): This can be deleted after switch to custom CRD - if !isMasterRole(pod, jobKind) { - return false, nil - } } if trial.Spec.MetricsCollector.Collector.Kind == common.NoneCollector { @@ -151,7 +146,7 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) } // Try to get Katib job kind and job name from mutating pod - jobKind, jobName, _ := s.getKatibJob(object, namespace) + _, jobName, _ := s.getKatibJob(object, namespace) trial := &trialsv1beta1.Trial{} // jobName and Trial name is equal @@ -169,12 +164,12 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) mountPath, pathKind := getMountPath(trial.Spec.MetricsCollector) if mountPath != "" { - if err = mutateVolume(mutatedPod, jobKind, mountPath, injectContainer.Name, trial.Spec.PrimaryContainerName, pathKind); err != nil { + if err = mutateVolume(mutatedPod, mountPath, injectContainer.Name, trial.Spec.PrimaryContainerName, pathKind); err != nil { return nil, err } } if needWrapWorkerContainer(trial.Spec.MetricsCollector) { - if err = wrapWorkerContainer(mutatedPod, namespace, jobKind, mountPath, pathKind, trial); err != nil { + if err = wrapWorkerContainer(trial, mutatedPod, namespace, mountPath, pathKind); err != nil { return nil, err } } diff --git a/pkg/webhook/v1beta1/pod/inject_webhook_test.go b/pkg/webhook/v1beta1/pod/inject_webhook_test.go index 63a5077765c..c42674371f9 100644 --- a/pkg/webhook/v1beta1/pod/inject_webhook_test.go +++ b/pkg/webhook/v1beta1/pod/inject_webhook_test.go @@ -2,6 +2,7 @@ package pod import ( "context" + "fmt" "path/filepath" "reflect" "sync" @@ -20,126 +21,84 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" + experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" - "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/kubeflow/katib/pkg/controller.v1beta1/util" mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common" ) func TestWrapWorkerContainer(t *testing.T) { - testCases := []struct { - Pod *v1.Pod - Namespace string - JobKind string - MetricsFile string - PathKind common.FileSystemKind - Trial *trialsv1beta1.Trial - Expected *v1.Pod - Err bool - Name string - }{ - { - Pod: &v1.Pod{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "tensorflow", - Command: []string{ - "python main.py", - }, - }, - }, - }, - }, - Namespace: "nohere", - JobKind: "TFJob", - MetricsFile: "testfile", - PathKind: common.FileKind, - Trial: &trialsv1beta1.Trial{ - Spec: trialsv1beta1.TrialSpec{ - MetricsCollector: common.MetricsCollectorSpec{ - Collector: &common.CollectorSpec{ - Kind: common.StdOutCollector, - }, - }, - }, - }, - Expected: &v1.Pod{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "tensorflow", - Command: []string{ - "sh", "-c", - }, - Args: []string{ - "python main.py 1>testfile 2>&1 && echo completed > $$$$.pid", - }, - }, - }, + primaryContainer := "tensorflow" + trial := &trialsv1beta1.Trial{ + ObjectMeta: metav1.ObjectMeta{ + Name: "trial-name", + Namespace: "trial-namespace", + }, + Spec: trialsv1beta1.TrialSpec{ + MetricsCollector: common.MetricsCollectorSpec{ + Collector: &common.CollectorSpec{ + Kind: common.StdOutCollector, }, }, - Err: false, - Name: "tensorflow container without sh -c", + PrimaryContainerName: primaryContainer, + PrimaryPodLabels: experimentsv1beta1.DefaultKubeflowJobPrimaryPodLabels, + SuccessCondition: experimentsv1beta1.DefaultKubeflowJobSuccessCondition, + FailureCondition: experimentsv1beta1.DefaultKubeflowJobFailureCondition, }, + } + + metricsFile := "metric.log" + + testCases := []struct { + Trial *trialsv1beta1.Trial + Pod *v1.Pod + MetricsFile string + PathKind common.FileSystemKind + ExpectedPod *v1.Pod + Err bool + TestDescription string + }{ { + Trial: trial, Pod: &v1.Pod{ Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "tensorflow", + Name: primaryContainer, Command: []string{ - "sh", "-c", "python main.py", }, }, }, }, }, - Namespace: "nohere", - JobKind: "TFJob", - MetricsFile: "testfile", + MetricsFile: metricsFile, PathKind: common.FileKind, - Trial: &trialsv1beta1.Trial{ - Spec: trialsv1beta1.TrialSpec{ - MetricsCollector: common.MetricsCollectorSpec{ - Collector: &common.CollectorSpec{ - Kind: common.StdOutCollector, - }, - }, - }, - }, - Expected: &v1.Pod{ + ExpectedPod: &v1.Pod{ Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "tensorflow", + Name: primaryContainer, Command: []string{ "sh", "-c", }, Args: []string{ - "python main.py 1>testfile 2>&1 && echo completed > $$$$.pid", + fmt.Sprintf("python main.py 1>%v 2>&1 && echo completed > $$$$.pid", metricsFile), }, }, }, }, }, - Err: false, - Name: "tensorflow container with sh -c", + Err: false, + TestDescription: "Tensorflow container without sh -c", }, { + Trial: trial, Pod: &v1.Pod{ Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "primary-container", - Command: []string{ - "sh", "-c", - "python main.py", - }, - }, - { - Name: "not-primary-container", + Name: primaryContainer, Command: []string{ "sh", "-c", "python main.py", @@ -148,44 +107,28 @@ func TestWrapWorkerContainer(t *testing.T) { }, }, }, - MetricsFile: "testfile", + MetricsFile: metricsFile, PathKind: common.FileKind, - Trial: &trialsv1beta1.Trial{ - Spec: trialsv1beta1.TrialSpec{ - PrimaryContainerName: "primary-container", - MetricsCollector: common.MetricsCollectorSpec{ - Collector: &common.CollectorSpec{ - Kind: common.StdOutCollector, - }, - }, - }, - }, - Expected: &v1.Pod{ + ExpectedPod: &v1.Pod{ Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "primary-container", + Name: primaryContainer, Command: []string{ "sh", "-c", }, Args: []string{ - "python main.py 1>testfile 2>&1 && echo completed > $$$$.pid", - }, - }, - { - Name: "not-primary-container", - Command: []string{ - "sh", "-c", - "python main.py", + fmt.Sprintf("python main.py 1>%v 2>&1 && echo completed > $$$$.pid", metricsFile), }, }, }, }, }, - Err: false, - Name: "Primary container name is set for training pod", + Err: false, + TestDescription: "Tensorflow container with sh -c", }, { + Trial: trial, Pod: &v1.Pod{ Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -195,30 +138,24 @@ func TestWrapWorkerContainer(t *testing.T) { }, }, }, - PathKind: common.FileKind, - Trial: &trialsv1beta1.Trial{ - Spec: trialsv1beta1.TrialSpec{ - PrimaryContainerName: "primary-container", - }, - }, - Err: true, - Name: "Training pod doesn't have primary container name", + PathKind: common.FileKind, + Err: true, + TestDescription: "Training pod doesn't have primary container", }, } for _, c := range testCases { - err := wrapWorkerContainer(c.Pod, c.Namespace, c.JobKind, c.MetricsFile, c.PathKind, c.Trial) + err := wrapWorkerContainer(c.Trial, c.Pod, c.Trial.Namespace, c.MetricsFile, c.PathKind) if c.Err && err == nil { - t.Errorf("Case %s failed. Expected error, got nil", c.Name) + t.Errorf("Case %s failed. Expected error, got nil", c.TestDescription) } else if !c.Err { if err != nil { - t.Errorf("Case %s failed. Expected nil, got error: %v", c.Name, err) - } else if !equality.Semantic.DeepEqual(c.Pod.Spec.Containers, c.Expected.Spec.Containers) { + t.Errorf("Case %s failed. Expected nil, got error: %v", c.TestDescription, err) + } else if !equality.Semantic.DeepEqual(c.Pod.Spec.Containers, c.ExpectedPod.Spec.Containers) { t.Errorf("Case %s failed. Expected pod: %v, got: %v", - c.Name, c.Expected.Spec.Containers, c.Pod.Spec.Containers) + c.TestDescription, c.ExpectedPod.Spec.Containers, c.Pod.Spec.Containers) } } - } } @@ -227,6 +164,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) { testMetricName := "accuracy" katibDBAddress := "katib-db-manager.kubeflow:6789" testPath := "/test/path" + testCases := []struct { TrialName string MetricName string @@ -452,7 +390,6 @@ func TestMutateVolume(t *testing.T) { }, }, }, - JobKind: "Job", MountPath: common.DefaultFilePath, SidecarContainerName: "metrics-collector", PrimaryContainerName: "train-job", @@ -461,7 +398,6 @@ func TestMutateVolume(t *testing.T) { err := mutateVolume( &tc.Pod, - tc.JobKind, tc.MountPath, tc.SidecarContainerName, tc.PrimaryContainerName, @@ -754,52 +690,6 @@ func TestGetKatibJob(t *testing.T) { } } -func TestIsMasterRole(t *testing.T) { - masterRoleLabel := make(map[string]string) - masterRoleLabel[consts.JobRole] = MasterRole - invalidLabel := make(map[string]string) - invalidLabel["invalid-label"] = "invalid" - testCases := []struct { - Pod v1.Pod - JobKind string - IsMaster bool - Name string - }{ - { - JobKind: "Job", - IsMaster: true, - Name: "Kubernetes Batch Job Pod", - }, - { - Pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: masterRoleLabel, - }, - }, - JobKind: "PyTorchJob", - IsMaster: true, - Name: "Pytorch Master Pod", - }, - { - Pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: invalidLabel, - }, - }, - JobKind: "PyTorchJob", - IsMaster: false, - Name: "Pytorch Pod with invalid label", - }, - } - - for _, tc := range testCases { - isMaster := isMasterRole(&tc.Pod, tc.JobKind) - if isMaster != tc.IsMaster { - t.Errorf("Case %v. Expected isMaster %v, got %v", tc.Name, tc.IsMaster, isMaster) - } - } -} - func TestIsPrimaryPod(t *testing.T) { testCases := []struct { podLabels map[string]string diff --git a/pkg/webhook/v1beta1/pod/utils.go b/pkg/webhook/v1beta1/pod/utils.go index 202a4711f36..52df2f36820 100644 --- a/pkg/webhook/v1beta1/pod/utils.go +++ b/pkg/webhook/v1beta1/pod/utils.go @@ -17,7 +17,6 @@ limitations under the License. package pod import ( - "errors" "fmt" "path/filepath" "strings" @@ -32,7 +31,6 @@ import ( common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" katibmanagerv1beta1 "github.com/kubeflow/katib/pkg/common/v1beta1" - jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1" mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common" ) @@ -50,32 +48,6 @@ func isPrimaryPod(podLabels, primaryLabels map[string]string) bool { return true } -func isMasterRole(pod *v1.Pod, jobKind string) bool { - if labels, ok := jobv1beta1.JobRoleMap[jobKind]; ok { - if len(labels) == 0 { - return true - } - for _, label := range labels { - if v, err := getLabel(pod, label); err == nil { - if v == MasterRole { - return true - } - } - } - } - return false -} - -func getLabel(pod *v1.Pod, targetLabel string) (string, error) { - labels := pod.Labels - for k, v := range labels { - if k == targetLabel { - return v, nil - } - } - return "", errors.New("Label " + targetLabel + " not found.") -} - func getRemoteImage(pod *v1.Pod, namespace string, containerIndex int) (crv1.Image, error) { // verify the image name, then download the remote config file c := pod.Spec.Containers[containerIndex] @@ -175,25 +147,14 @@ func needWrapWorkerContainer(mc common.MetricsCollectorSpec) bool { return false } -func wrapWorkerContainer( - pod *v1.Pod, namespace, jobKind, metricsFile string, - pathKind common.FileSystemKind, - trial *trialsv1beta1.Trial) error { +func wrapWorkerContainer(trial *trialsv1beta1.Trial, pod *v1.Pod, namespace, + metricsFile string, pathKind common.FileSystemKind) error { + // Search for primary container. index := -1 for i, c := range pod.Spec.Containers { - if trial.Spec.PrimaryContainerName != "" && c.Name == trial.Spec.PrimaryContainerName { + if c.Name == trial.Spec.PrimaryContainerName { index = i break - // TODO (andreyvelich): This can be deleted after switch to custom CRD - } else if trial.Spec.PrimaryContainerName == "" { - jobProvider, err := jobv1beta1.New(jobKind) - if err != nil { - return err - } - if jobProvider.IsTrainingContainer(i, c) { - index = i - break - } } } if index >= 0 { @@ -236,7 +197,7 @@ func getMarkCompletedCommand(mountPath string, pathKind common.FileSystemKind) s return fmt.Sprintf("echo %s > %s", mccommon.TrainingCompleted, pidFile) } -func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName, primaryContainerName string, pathKind common.FileSystemKind) error { +func mutateVolume(pod *v1.Pod, mountPath, sidecarContainerName, primaryContainerName string, pathKind common.FileSystemKind) error { metricsVol := v1.Volume{ Name: common.MetricsVolume, VolumeSource: v1.VolumeSource{ @@ -251,27 +212,19 @@ func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName, primary Name: metricsVol.Name, MountPath: dir, } + indexList := []int{} for i, c := range pod.Spec.Containers { shouldMount := false - if c.Name == sidecarContainerName { + // We should mount volume only on sidecar and primary containers + if c.Name == sidecarContainerName || c.Name == primaryContainerName { shouldMount = true - } else { - if primaryContainerName != "" && c.Name == primaryContainerName { - shouldMount = true - // TODO (andreyvelich): This can be deleted after switch to custom CRD - } else if primaryContainerName == "" { - jobProvider, err := jobv1beta1.New(jobKind) - if err != nil { - return err - } - shouldMount = jobProvider.IsTrainingContainer(i, c) - } } if shouldMount { indexList = append(indexList, i) } } + for _, i := range indexList { c := &pod.Spec.Containers[i] if c.VolumeMounts == nil { diff --git a/test/e2e/v1beta1/invalid-experiment.yaml b/test/e2e/v1beta1/invalid-experiment.yaml index fadfdfb4bd2..f3bdbabf325 100644 --- a/test/e2e/v1beta1/invalid-experiment.yaml +++ b/test/e2e/v1beta1/invalid-experiment.yaml @@ -33,6 +33,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model diff --git a/test/e2e/v1beta1/valid-experiment.yaml b/test/e2e/v1beta1/valid-experiment.yaml index dc3699f64ab..fe0d24f19ba 100644 --- a/test/e2e/v1beta1/valid-experiment.yaml +++ b/test/e2e/v1beta1/valid-experiment.yaml @@ -33,6 +33,7 @@ spec: - adam - ftrl trialTemplate: + primaryContainerName: training-container trialParameters: - name: learningRate description: Learning rate for the training model