diff --git a/examples/v1beta1/resume-experiment/from-volume-resume.yaml b/examples/v1beta1/resume-experiment/from-volume-resume.yaml new file mode 100644 index 00000000000..44d83410e17 --- /dev/null +++ b/examples/v1beta1/resume-experiment/from-volume-resume.yaml @@ -0,0 +1,66 @@ +apiVersion: "kubeflow.org/v1beta1" +kind: Experiment +metadata: + namespace: kubeflow + labels: + controller-tools.k8s.io: "1.0" + name: from-volume-resume +spec: + objective: + type: maximize + goal: 0.99 + objectiveMetricName: Validation-accuracy + additionalMetricNames: + - Train-accuracy + algorithm: + algorithmName: random + parallelTrialCount: 3 + maxTrialCount: 12 + maxFailedTrialCount: 3 + resumePolicy: FromVolume + parameters: + - name: lr + parameterType: double + feasibleSpace: + min: "0.01" + max: "0.03" + - name: num-layers + parameterType: int + feasibleSpace: + min: "2" + max: "5" + - name: optimizer + parameterType: categorical + feasibleSpace: + list: + - sgd + - adam + - ftrl + trialTemplate: + trialParameters: + - name: learningRate + description: Learning rate for the training model + reference: lr + - name: numberLayers + description: Number of training model layers + reference: num-layers + - name: optimizer + description: Training model optimizer (sdg, adam or ftrl) + reference: optimizer + trialSpec: + apiVersion: batch/v1 + kind: Job + spec: + template: + spec: + containers: + - name: training-container + image: docker.io/kubeflowkatib/mxnet-mnist + command: + - "python3" + - "/opt/mxnet-mnist/mnist.py" + - "--batch-size=64" + - "--lr=${trialParameters.learningRate}" + - "--num-layers=${trialParameters.numberLayers}" + - "--optimizer=${trialParameters.optimizer}" + restartPolicy: Never diff --git a/examples/v1beta1/never-resume-example.yaml b/examples/v1beta1/resume-experiment/never-resume.yaml similarity index 98% rename from examples/v1beta1/never-resume-example.yaml rename to examples/v1beta1/resume-experiment/never-resume.yaml index bbcfb79c29e..5452c829664 100644 --- a/examples/v1beta1/never-resume-example.yaml +++ b/examples/v1beta1/resume-experiment/never-resume.yaml @@ -4,7 +4,7 @@ metadata: namespace: kubeflow labels: controller-tools.k8s.io: "1.0" - name: never-resume-example + name: never-resume spec: objective: type: maximize diff --git a/manifests/v1beta1/katib-controller/rbac.yaml b/manifests/v1beta1/katib-controller/rbac.yaml index a0971c44fed..dec9b5373f4 100644 --- a/manifests/v1beta1/katib-controller/rbac.yaml +++ b/manifests/v1beta1/katib-controller/rbac.yaml @@ -12,6 +12,8 @@ rules: - secrets - events - namespaces + - persistentvolumes + - persistentvolumeclaims verbs: - "*" - apiGroups: diff --git a/pkg/apis/controller/experiments/v1beta1/experiment_types.go b/pkg/apis/controller/experiments/v1beta1/experiment_types.go index 4aa7cf7b8cd..2c8eb95a7a3 100644 --- a/pkg/apis/controller/experiments/v1beta1/experiment_types.go +++ b/pkg/apis/controller/experiments/v1beta1/experiment_types.go @@ -157,11 +157,21 @@ const ( ExperimentFailed ExperimentConditionType = "Failed" ) +// ResumePolicyType describes how the experiment should be resumed. +// Only one of the following resume policies may be specified. +// If none of the following policies is specified, the default one is LongRunning. type ResumePolicyType string const ( + // NeverResume indicates that experiment can't be resumed. NeverResume ResumePolicyType = "Never" + // LongRunning indicates that experiment's suggestion resources + // (deployment and service) are always running. LongRunning ResumePolicyType = "LongRunning" + // FromVolume indicates that volume is attached to experiment's + // suggestion. Suggestion data can be retained in the volume. + // When experiment is succeeded suggestion deployment and service are deleted. + FromVolume ResumePolicyType = "FromVolume" ) type ParameterSpec struct { diff --git a/pkg/apis/controller/suggestions/v1beta1/suggestion_types.go b/pkg/apis/controller/suggestions/v1beta1/suggestion_types.go index a111516a05a..caa6970c8b0 100644 --- a/pkg/apis/controller/suggestions/v1beta1/suggestion_types.go +++ b/pkg/apis/controller/suggestions/v1beta1/suggestion_types.go @@ -21,13 +21,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" + + experiment "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" ) -// SuggestionSpec defines the desired state of Suggestion +// SuggestionSpec defines the desired state of suggestion. type SuggestionSpec struct { + // Name of the algorithm that suggestion is used. AlgorithmName string `json:"algorithmName"` // Number of suggestions requested Requests int32 `json:"requests,omitempty"` + // Describes resuming policy which usually take effect after experiment terminated. + // Default value is LongRunning. + ResumePolicy experiment.ResumePolicyType `json:"resumePolicy,omitempty"` } // SuggestionStatus defines the observed state of Suggestion diff --git a/pkg/apis/controller/suggestions/v1beta1/util.go b/pkg/apis/controller/suggestions/v1beta1/util.go index 33d3878ccba..0455a1a10ff 100644 --- a/pkg/apis/controller/suggestions/v1beta1/util.go +++ b/pkg/apis/controller/suggestions/v1beta1/util.go @@ -90,18 +90,34 @@ func (suggestion *Suggestion) MarkSuggestionStatusCreated(reason, message string suggestion.setCondition(SuggestionCreated, v1.ConditionTrue, reason, message) } -func (suggestion *Suggestion) MarkSuggestionStatusRunning(reason, message string) { - //suggestion.removeCondition(SuggestionRestarting) - suggestion.setCondition(SuggestionRunning, v1.ConditionTrue, reason, message) +// MarkSuggestionStatusRunning sets suggestion Running status. +func (suggestion *Suggestion) MarkSuggestionStatusRunning(status v1.ConditionStatus, reason, message string) { + // When suggestion is restrating we need to remove succeeded status from suggestion. + // That should happen only when ResumePolicy = FromVolume + suggestion.removeCondition(SuggestionSucceeded) + suggestion.setCondition(SuggestionRunning, status, reason, message) } +// MarkSuggestionStatusSucceeded sets suggestion Succeeded status to true. +// Suggestion can be succeeded only if ResumeExperiment = Never or ResumeExperiment = FromVolume func (suggestion *Suggestion) MarkSuggestionStatusSucceeded(reason, message string) { - currentCond := getCondition(suggestion, SuggestionRunning) - if currentCond != nil { - suggestion.setCondition(SuggestionRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message) + + // When suggestion is Succeeded suggestion Running status is false + runningCond := getCondition(suggestion, SuggestionRunning) + succeededReason := "Suggestion is succeeded" + if runningCond != nil { + msg := "Suggestion is not running" + suggestion.setCondition(SuggestionRunning, v1.ConditionFalse, succeededReason, msg) + } + + // When suggestion is Succeeded suggestion DeploymentReady status is false + deploymentReadyCond := getCondition(suggestion, SuggestionDeploymentReady) + if deploymentReadyCond != nil { + msg := "Deployment is not ready" + suggestion.setCondition(SuggestionDeploymentReady, v1.ConditionFalse, succeededReason, msg) } - suggestion.setCondition(SuggestionSucceeded, v1.ConditionTrue, reason, message) + suggestion.setCondition(SuggestionSucceeded, v1.ConditionTrue, reason, message) } func (suggestion *Suggestion) MarkSuggestionStatusFailed(reason, message string) { diff --git a/pkg/controller.v1beta1/consts/const.go b/pkg/controller.v1beta1/consts/const.go index 8a148c1b405..841828e414a 100644 --- a/pkg/controller.v1beta1/consts/const.go +++ b/pkg/controller.v1beta1/consts/const.go @@ -3,6 +3,8 @@ package consts import ( "time" + corev1 "k8s.io/api/core/v1" + "github.com/kubeflow/katib/pkg/util/v1beta1/env" ) @@ -147,6 +149,25 @@ const ( // UnavailableMetricValue is the value when metric was not reported or metric value can't be converted to float64 UnavailableMetricValue = "unavailable" + + // DefaultSuggestionVolumeLocalPathPrefix is the default cluster local path prefix for suggestion volume + // Full local path = /tmp/katib/suggestions/- + DefaultSuggestionVolumeLocalPathPrefix = "/tmp/katib/suggestions/" + + // DefaultSuggestionStorageClassName is the default value for suggestion's volume storage class name + DefaultSuggestionStorageClassName = "katib-suggestion" + + // DefaultSuggestionVolumeAccessMode is the default value for suggestion's volume access mode + DefaultSuggestionVolumeAccessMode = corev1.ReadWriteOnce + + // DefaultSuggestionVolumeStorage is the default value for suggestion's volume storage + DefaultSuggestionVolumeStorage = "1Gi" + + // ContainerSuggestionVolumeName is the volume name that mounted on suggestion container + ContainerSuggestionVolumeName = "suggestion-volume" + + // DefaultContainerSuggestionVolumeMountPath is the default mount path in suggestion container + DefaultContainerSuggestionVolumeMountPath = "/opt/katib/data" ) var ( diff --git a/pkg/controller.v1beta1/experiment/experiment_controller.go b/pkg/controller.v1beta1/experiment/experiment_controller.go index 20edfb6b36e..1fc4ec2ef3d 100644 --- a/pkg/controller.v1beta1/experiment/experiment_controller.go +++ b/pkg/controller.v1beta1/experiment/experiment_controller.go @@ -101,7 +101,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { } if err = addWatch(mgr, c); err != nil { - log.Error(err, "Trial watch failed") + log.Error(err, "addWatch failed") return err } @@ -187,27 +187,39 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re } if instance.IsCompleted() { + // Cleanup suggestion after Experiment is finished. + // If ResumePolicy = Never or ResumePolicy = FromVolume, delete suggestion deployment, service and mark suggestion status succeeded. + if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume || instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume { + err := r.cleanupSuggestionResources(instance) + if err != nil { + logger.Error(err, "cleanupSuggestionResources error") + return reconcile.Result{}, err + } + } // Check if completed instance is restartable - // Experiment is restartable only if it is in succeeded state by reaching max trials - // And Resume Policy is LongRunning + // Experiment is restartable only if it is in succeeded state by reaching max trials and + // ResumePolicy = LongRunning or ResumePolicy = FromVolume if util.IsCompletedExperimentRestartable(instance) { // Check if max trials is reconfigured if (instance.Spec.MaxTrialCount != nil && *instance.Spec.MaxTrialCount != instance.Status.Trials) || (instance.Spec.MaxTrialCount == nil && instance.Status.Trials != 0) { - logger.Info("Experiment is restarting") + logger.Info("Experiment is restarting", + "MaxTrialCount", instance.Spec.MaxTrialCount, + "ParallelTrialCount", instance.Spec.ParallelTrialCount, + "MaxFailedTrialCount", instance.Spec.MaxFailedTrialCount) msg := "Experiment is restarted" instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg) - } - } else { - // Terminate Suggestion after Experiment is finished if Resume Policy is Never - if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume { - err := r.terminateSuggestion(instance) - if err != nil { - logger.Error(err, "Terminate Suggestion error") + // If ResumePolicy = FromVolume, suggestion must remove succeeded status and add running status when restarting + if instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume { + err := r.restartSuggestion(instance) + if err != nil { + logger.Error(err, "restartSuggestion error") + return reconcile.Result{}, err + } } - return reconcile.Result{}, err } + } else { // If experiment is completed with no running trials, stop reconcile if !instance.HasRunningTrials() { return reconcile.Result{}, nil diff --git a/pkg/controller.v1beta1/experiment/experiment_controller_test.go b/pkg/controller.v1beta1/experiment/experiment_controller_test.go index cd21844db52..d7382a6c6b4 100644 --- a/pkg/controller.v1beta1/experiment/experiment_controller_test.go +++ b/pkg/controller.v1beta1/experiment/experiment_controller_test.go @@ -88,7 +88,7 @@ func TestCreateExperiment(t *testing.T) { mgrStopped.Wait() }() - // Create the Trial object and expect the Reconcile and Deployment to be created + // Create the experiment object and expect the Reconcile and Deployment to be created err = c.Create(context.TODO(), instance) // The instance object may not be a valid object because it might be missing some required fields. // Please modify the instance object by adding required fields and then remove the following if statement. @@ -234,7 +234,7 @@ func TestReconcileExperiment(t *testing.T) { mgrStopped.Wait() }() - // Create the Trial object and expect the Reconcile and Deployment to be created + // Create the experiment object and expect the Reconcile and Deployment to be created err = c.Create(context.TODO(), instance) // The instance object may not be a valid object because it might be missing some required fields. // Please modify the instance object by adding required fields and then remove the following if statement. diff --git a/pkg/controller.v1beta1/experiment/experiment_util.go b/pkg/controller.v1beta1/experiment/experiment_util.go index d6e0d986716..410a37f880f 100644 --- a/pkg/controller.v1beta1/experiment/experiment_util.go +++ b/pkg/controller.v1beta1/experiment/experiment_util.go @@ -5,6 +5,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -12,7 +13,6 @@ import ( experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" suggestionsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1" trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" - suggestionController "github.com/kubeflow/katib/pkg/controller.v1beta1/suggestion" "github.com/kubeflow/katib/pkg/controller.v1beta1/util" ) @@ -103,7 +103,8 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1beta1.Expe } } -func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1beta1.Experiment) error { +func (r *ReconcileExperiment) cleanupSuggestionResources(instance *experimentsv1beta1.Experiment) error { + logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) original := &suggestionsv1beta1.Suggestion{} err := r.Get(context.TODO(), types.NamespacedName{Namespace: instance.GetNamespace(), Name: instance.GetName()}, original) @@ -113,15 +114,53 @@ func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1beta1.E } return err } + // If Suggestion is failed or Suggestion is Succeeded, not needed to terminate Suggestion if original.IsFailed() || original.IsSucceeded() { return nil } - log.Info("Start terminating suggestion") + + logger.Info("Start cleanup suggestion resources") + suggestion := original.DeepCopy() + + reason := "Experiment is succeeded" + // If ResumePolicy = Never, mark suggestion status succeeded, can't be restarted + if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume { + msg := "Suggestion is succeeded, can't be restarted" + suggestion.MarkSuggestionStatusSucceeded(reason, msg) + logger.Info("Mark suggestion succeeded, can't be restarted") + + // If ResumePolicy = FromVolume, mark suggestion status succeeded, can be restarted + } else if instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume { + msg := "Suggestion is succeeded, suggestion volume is not deleted, can be restarted" + suggestion.MarkSuggestionStatusSucceeded(reason, msg) + logger.Info("Mark suggestion succeeded, can be restarted") + } + + if err := r.UpdateSuggestionStatus(suggestion); err != nil { + return err + } + return nil +} + +func (r *ReconcileExperiment) restartSuggestion(instance *experimentsv1beta1.Experiment) error { + logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + original := &suggestionsv1beta1.Suggestion{} + err := r.Get(context.TODO(), + types.NamespacedName{Namespace: instance.GetNamespace(), Name: instance.GetName()}, original) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + + logger.Info("Suggestion is restarting, suggestion Running status is false") suggestion := original.DeepCopy() - msg := "Suggestion is succeeded" - suggestion.MarkSuggestionStatusSucceeded(suggestionController.SuggestionSucceededReason, msg) - log.Info("Mark suggestion succeeded") + reason := "Experiment is restarting" + msg := "Suggestion is not running" + // Mark suggestion status not running because experiment is restarting and suggestion deployment is not ready + suggestion.MarkSuggestionStatusRunning(corev1.ConditionFalse, reason, msg) if err := r.UpdateSuggestionStatus(suggestion); err != nil { return err diff --git a/pkg/controller.v1beta1/experiment/suggestion/suggestion.go b/pkg/controller.v1beta1/experiment/suggestion/suggestion.go index 700b091c86d..0297d133549 100644 --- a/pkg/controller.v1beta1/experiment/suggestion/suggestion.go +++ b/pkg/controller.v1beta1/experiment/suggestion/suggestion.go @@ -66,6 +66,7 @@ func (g *General) createSuggestion(instance *experimentsv1beta1.Experiment, sugg Spec: suggestionsv1beta1.SuggestionSpec{ AlgorithmName: instance.Spec.Algorithm.AlgorithmName, Requests: suggestionRequests, + ResumePolicy: instance.Spec.ResumePolicy, }, } diff --git a/pkg/controller.v1beta1/experiment/util/status_util.go b/pkg/controller.v1beta1/experiment/util/status_util.go index 57f5b6dff70..19e68a449e4 100644 --- a/pkg/controller.v1beta1/experiment/util/status_util.go +++ b/pkg/controller.v1beta1/experiment/util/status_util.go @@ -18,6 +18,7 @@ package util import ( "strconv" + "k8s.io/apimachinery/pkg/types" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" @@ -175,6 +176,7 @@ func getObjectiveMetricValue(trial trialsv1beta1.Trial) string { // UpdateExperimentStatusCondition updates the experiment status. func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance *experimentsv1beta1.Experiment, isObjectiveGoalReached bool, getSuggestionDone bool) { + logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) completedTrialsCount := instance.Status.TrialsSucceeded + instance.Status.TrialsFailed + instance.Status.TrialsKilled failedTrialsCount := instance.Status.TrialsFailed activeTrialsCount := instance.Status.TrialsPending + instance.Status.TrialsRunning @@ -185,6 +187,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * instance.MarkExperimentStatusSucceeded(ExperimentGoalReachedReason, msg) instance.Status.CompletionTime = &now collector.IncreaseExperimentsSucceededCount(instance.Namespace) + logger.Info(msg) return } @@ -194,6 +197,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * instance.MarkExperimentStatusFailed(ExperimentFailedReason, msg) instance.Status.CompletionTime = &now collector.IncreaseExperimentsFailedCount(instance.Namespace) + logger.Info(msg) return } @@ -203,6 +207,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * instance.MarkExperimentStatusSucceeded(ExperimentMaxTrialsReachedReason, msg) instance.Status.CompletionTime = &now collector.IncreaseExperimentsSucceededCount(instance.Namespace) + logger.Info(msg) return } @@ -211,6 +216,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * instance.MarkExperimentStatusSucceeded(ExperimentSuggestionEndReachedReason, msg) instance.Status.CompletionTime = &now collector.IncreaseExperimentsSucceededCount(instance.Namespace) + logger.Info(msg) return } @@ -218,8 +224,10 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * instance.MarkExperimentStatusRunning(ExperimentRunningReason, msg) } +// IsCompletedExperimentRestartable returns whether experiment is restartable or not func IsCompletedExperimentRestartable(instance *experimentsv1beta1.Experiment) bool { - if instance.IsSucceeded() && instance.IsCompletedReason(ExperimentMaxTrialsReachedReason) && instance.Spec.ResumePolicy == experimentsv1beta1.LongRunning { + if instance.IsSucceeded() && instance.IsCompletedReason(ExperimentMaxTrialsReachedReason) && + (instance.Spec.ResumePolicy == experimentsv1beta1.LongRunning || instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume) { return true } return false diff --git a/pkg/controller.v1beta1/suggestion/composer/composer.go b/pkg/controller.v1beta1/suggestion/composer/composer.go index 6e19202a592..76adbe1eda6 100644 --- a/pkg/controller.v1beta1/suggestion/composer/composer.go +++ b/pkg/controller.v1beta1/suggestion/composer/composer.go @@ -14,7 +14,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" suggestionsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1" + "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/kubeflow/katib/pkg/controller.v1beta1/util" "github.com/kubeflow/katib/pkg/util/v1beta1/katibconfig" @@ -37,6 +39,7 @@ var ( type Composer interface { DesiredDeployment(s *suggestionsv1beta1.Suggestion) (*appsv1.Deployment, error) DesiredService(s *suggestionsv1beta1.Suggestion) (*corev1.Service, error) + DesiredVolume(s *suggestionsv1beta1.Suggestion) (*corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) CreateComposer(mgr manager.Manager) Composer } @@ -51,6 +54,7 @@ func New(mgr manager.Manager) Composer { return ptr.CreateComposer(mgr) } +// DesiredDeployment returns desired deployment for suggestion func (g *General) DesiredDeployment(s *suggestionsv1beta1.Suggestion) (*appsv1.Deployment, error) { suggestionConfigData, err := katibconfig.GetSuggestionConfigData(s.Spec.AlgorithmName, g.Client) @@ -93,12 +97,28 @@ func (g *General) DesiredDeployment(s *suggestionsv1beta1.Suggestion) (*appsv1.D d.Spec.Template.Spec.ServiceAccountName = suggestionConfigData[consts.LabelSuggestionServiceAccountName] } + // Attach volume to the suggestion pod spec if ResumePolicy = FromVolume + if s.Spec.ResumePolicy == experimentsv1beta1.FromVolume { + d.Spec.Template.Spec.Volumes = []corev1.Volume{ + { + Name: consts.ContainerSuggestionVolumeName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: util.GetAlgorithmPersistentVolumeClaimName(s), + }, + }, + }, + } + } + if err := controllerutil.SetControllerReference(s, d, g.scheme); err != nil { return nil, err } + return d, nil } +// DesiredService returns desired service for suggestion func (g *General) DesiredService(s *suggestionsv1beta1.Suggestion) (*corev1.Service, error) { ports := []corev1.ServicePort{ { @@ -218,9 +238,93 @@ func (g *General) desiredContainer(s *suggestionsv1beta1.Suggestion, suggestionC FailureThreshold: defaultFailureThreshold, } } + + // Attach volume mounts to the suggestion container if ResumePolicy = FromVolume + if s.Spec.ResumePolicy == experimentsv1beta1.FromVolume { + c.VolumeMounts = []corev1.VolumeMount{ + { + Name: consts.ContainerSuggestionVolumeName, + MountPath: consts.DefaultContainerSuggestionVolumeMountPath, + }, + } + } return c, nil } +// DesiredVolume returns desired PVC and PV for suggestion. +// If StorageClassName != DefaultSuggestionStorageClassName returns only PVC. +func (g *General) DesiredVolume(s *suggestionsv1beta1.Suggestion) (*corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) { + persistentVolumeName := util.GetAlgorithmPersistentVolumeName(s) + + // TODO (andreyvelich): Enable to specify these values from Katib config + storageClassName := consts.DefaultSuggestionStorageClassName + persistentVolumePath := consts.DefaultSuggestionVolumeLocalPathPrefix + persistentVolumeName + volumeAccessModes := consts.DefaultSuggestionVolumeAccessMode + + volumeStorage, err := resource.ParseQuantity(consts.DefaultSuggestionVolumeStorage) + if err != nil { + return nil, nil, err + } + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: util.GetAlgorithmPersistentVolumeClaimName(s), + Namespace: s.Namespace, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + StorageClassName: &storageClassName, + AccessModes: []corev1.PersistentVolumeAccessMode{ + volumeAccessModes, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: volumeStorage, + }, + }, + }, + } + + // Add owner reference to the pvc so that it could be GC after the suggestion is deleted + if err := controllerutil.SetControllerReference(s, pvc, g.scheme); err != nil { + return nil, nil, err + } + + var pv *corev1.PersistentVolume + // Create PV with local hostPath by default + if storageClassName == consts.DefaultSuggestionStorageClassName { + localLabel := map[string]string{"type": "local"} + + pv = &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: persistentVolumeName, + Labels: localLabel, + }, + Spec: corev1.PersistentVolumeSpec{ + StorageClassName: consts.DefaultSuggestionStorageClassName, + AccessModes: []corev1.PersistentVolumeAccessMode{ + volumeAccessModes, + }, + PersistentVolumeSource: corev1.PersistentVolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: persistentVolumePath, + }, + }, + Capacity: corev1.ResourceList{ + corev1.ResourceStorage: volumeStorage, + }, + }, + } + + // Add owner reference to the pv so that it could be GC after the suggestion is deleted + if err := controllerutil.SetControllerReference(s, pv, g.scheme); err != nil { + return nil, nil, err + } + + } + + return pvc, pv, nil +} + func (g *General) CreateComposer(mgr manager.Manager) Composer { return &General{mgr.GetScheme(), mgr.GetClient()} } diff --git a/pkg/controller.v1beta1/suggestion/suggestion_controller.go b/pkg/controller.v1beta1/suggestion/suggestion_controller.go index a20a82f01af..7e955cbce44 100644 --- a/pkg/controller.v1beta1/suggestion/suggestion_controller.go +++ b/pkg/controller.v1beta1/suggestion/suggestion_controller.go @@ -95,6 +95,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { if err != nil { return err } + + err = c.Watch(&source.Kind{Type: &corev1.PersistentVolumeClaim{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &suggestionsv1beta1.Suggestion{}, + }) + if err != nil { + return err + } + log.Info("Suggestion controller created") return nil } @@ -134,7 +143,7 @@ func (r *ReconcileSuggestion) Reconcile(request reconcile.Request) (reconcile.Re return reconcile.Result{}, err } instance := oldS.DeepCopy() - // If ResumePolicyType is LongRunning, suggestion status will never be succeeded. + // Suggestion will be succeeded if ResumePolicy = Never or ResumePolicy = FromVolume if instance.IsSucceeded() { err = r.deleteDeployment(instance) if err != nil { @@ -160,7 +169,7 @@ func (r *ReconcileSuggestion) Reconcile(request reconcile.Request) (reconcile.Re consts.ReconcileErrorReason, err.Error()) // Try updating just the status condition when possible - // Status conditions might need to be updated even in error + // Status conditions might need to be updated even in error // Ignore all other status fields else it will be inconsistent during retry _ = r.updateStatusCondition(instance, oldS) logger.Error(err, "Reconcile Suggestion error") @@ -174,8 +183,25 @@ func (r *ReconcileSuggestion) Reconcile(request reconcile.Request) (reconcile.Re return reconcile.Result{}, nil } +// ReconcileSuggestion is the main reconcile loop for suggestion CR. func (r *ReconcileSuggestion) ReconcileSuggestion(instance *suggestionsv1beta1.Suggestion) error { logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + + // If ResumePolicy = FromVolume volume is reconciled for suggestion + if instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume { + pvc, pv, err := r.DesiredVolume(instance) + if err != nil { + return err + } + + // Reconcile PVC and PV + _, _, err = r.reconcileVolume(pvc, pv) + if err != nil { + return err + } + + } + service, err := r.DesiredService(instance) if err != nil { return err @@ -217,6 +243,8 @@ func (r *ReconcileSuggestion) ReconcileSuggestion(instance *suggestionsv1beta1.S client.MatchingLabels(util.TrialLabels(experiment)), trials); err != nil { return err } + // TODO (andreyvelich): Do we want to run ValidateAlgorithmSettings when Experiment is restarting? + // Currently it is running. if !instance.IsRunning() { if err = r.ValidateAlgorithmSettings(instance, experiment); err != nil { logger.Error(err, "Marking suggestion failed as algorithm settings validation failed") @@ -226,7 +254,7 @@ func (r *ReconcileSuggestion) ReconcileSuggestion(instance *suggestionsv1beta1.S return nil } msg := "Suggestion is running" - instance.MarkSuggestionStatusRunning(SuggestionRunningReason, msg) + instance.MarkSuggestionStatusRunning(corev1.ConditionTrue, SuggestionRunningReason, msg) } logger.Info("Sync assignments", "suggestions", instance.Spec.Requests) if err = r.SyncAssignments(instance, experiment, trials.Items); err != nil { diff --git a/pkg/controller.v1beta1/suggestion/suggestion_controller_util.go b/pkg/controller.v1beta1/suggestion/suggestion_controller_util.go index 3b719579ef1..c0d14b438a5 100644 --- a/pkg/controller.v1beta1/suggestion/suggestion_controller_util.go +++ b/pkg/controller.v1beta1/suggestion/suggestion_controller_util.go @@ -11,6 +11,8 @@ import ( "k8s.io/apimachinery/pkg/types" ) +// TODO (andreyvelich): Setup logger with suggestion name/namespace here + func (r *ReconcileSuggestion) reconcileDeployment(deploy *appsv1.Deployment) (*appsv1.Deployment, error) { foundDeploy := &appsv1.Deployment{} err := r.Get(context.TODO(), types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}, foundDeploy) @@ -37,6 +39,37 @@ func (r *ReconcileSuggestion) reconcileService(service *corev1.Service) (*corev1 return foundService, nil } +func (r *ReconcileSuggestion) reconcileVolume(pvc *corev1.PersistentVolumeClaim, pv *corev1.PersistentVolume) (*corev1.PersistentVolumeClaim, *corev1.PersistentVolume, error) { + foundPVC := &corev1.PersistentVolumeClaim{} + foundPV := &corev1.PersistentVolume{} + + // Try to find/create PV, if PV has to be created + if pv != nil { + err := r.Get(context.TODO(), types.NamespacedName{Name: pv.Name}, foundPV) + if err != nil && errors.IsNotFound(err) { + log.Info("Creating Persistent Volume", "name", pv.Name) + err = r.Create(context.TODO(), pv) + // Return only if Create was failed, otherwise try to find/create PVC + if err != nil { + return nil, nil, err + } + } else if err != nil { + return nil, nil, err + } + } + + // Try to find/create PVC + err := r.Get(context.TODO(), types.NamespacedName{Name: pvc.Name, Namespace: pvc.Namespace}, foundPVC) + if err != nil && errors.IsNotFound(err) { + log.Info("Creating Persistent Volume Claim", "namespace", pvc.Namespace, "name", pvc.Name) + err = r.Create(context.TODO(), pvc) + return nil, nil, err + } else if err != nil { + return nil, nil, err + } + return foundPVC, foundPV, nil +} + func (r *ReconcileSuggestion) deleteDeployment(instance *v1beta1.Suggestion) error { deploy, err := r.DesiredDeployment(instance) if err != nil { diff --git a/pkg/controller.v1beta1/util/suggestion.go b/pkg/controller.v1beta1/util/suggestion.go index f9713279f53..ec3fb62d49e 100644 --- a/pkg/controller.v1beta1/util/suggestion.go +++ b/pkg/controller.v1beta1/util/suggestion.go @@ -7,14 +7,26 @@ import ( "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" ) +// GetAlgorithmDeploymentName returns name for the suggestion's deployment func GetAlgorithmDeploymentName(s *suggestionsv1beta1.Suggestion) string { return s.Name + "-" + s.Spec.AlgorithmName } +// GetAlgorithmServiceName returns name for the suggestion's service func GetAlgorithmServiceName(s *suggestionsv1beta1.Suggestion) string { return s.Name + "-" + s.Spec.AlgorithmName } +// GetAlgorithmPersistentVolumeName returns name for the suggestion's PV +func GetAlgorithmPersistentVolumeName(s *suggestionsv1beta1.Suggestion) string { + return s.Name + "-" + s.Spec.AlgorithmName + "-" + s.Namespace +} + +// GetAlgorithmPersistentVolumeClaimName returns name for the suggestion's PVC +func GetAlgorithmPersistentVolumeClaimName(s *suggestionsv1beta1.Suggestion) string { + return s.Name + "-" + s.Spec.AlgorithmName +} + // GetAlgorithmEndpoint returns the endpoint of the algorithm service. func GetAlgorithmEndpoint(s *suggestionsv1beta1.Suggestion) string { serviceName := GetAlgorithmServiceName(s) diff --git a/pkg/webhook/v1beta1/experiment/validation_webhook.go b/pkg/webhook/v1beta1/experiment/validation_webhook.go index 1cf79f9178f..6a52092f0f6 100644 --- a/pkg/webhook/v1beta1/experiment/validation_webhook.go +++ b/pkg/webhook/v1beta1/experiment/validation_webhook.go @@ -24,6 +24,7 @@ import ( "net/http" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" ktypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/runtime/inject" @@ -89,6 +90,24 @@ func (v *experimentValidator) Handle(ctx context.Context, req types.Request) typ if err != nil { return admission.ErrorResponse(http.StatusBadRequest, err) } + + // By default we create PV with cluster local host path for each experiment with ResumePolicy = FromVolume. + // User should not submit new experiment if previous PV for the same experiment was not deleted. + // We unable to watch for the PV events in controller. + // Webhook forbids experiment creation until coresponding PV will be deleted. + if inst.Spec.ResumePolicy == experimentsv1beta1.FromVolume && oldInst == nil { + foundPV := &v1.PersistentVolume{} + PVName := inst.Name + "-" + inst.Namespace + err := v.client.Get(context.TODO(), ktypes.NamespacedName{Name: PVName}, foundPV) + if !errors.IsNotFound(err) { + returnError := fmt.Errorf("Cannot create the Experiment: %v in namespace: %v, PV: %v is not deleted", inst.Name, inst.Namespace, PVName) + if err != nil { + returnError = fmt.Errorf("Cannot create the Experiment: %v in namespace: %v, error: %v", inst.Name, inst.Namespace, err) + } + return admission.ErrorResponse(http.StatusBadRequest, returnError) + } + } + return admission.ValidationResponse(true, "") } diff --git a/pkg/webhook/v1beta1/experiment/validator/validator.go b/pkg/webhook/v1beta1/experiment/validator/validator.go index b645d1c9fa4..5a97cc150e9 100644 --- a/pkg/webhook/v1beta1/experiment/validator/validator.go +++ b/pkg/webhook/v1beta1/experiment/validator/validator.go @@ -135,6 +135,7 @@ func (g *DefaultValidator) validateResumePolicy(resume experimentsv1beta1.Resume "": "", experimentsv1beta1.NeverResume: "", experimentsv1beta1.LongRunning: "", + experimentsv1beta1.FromVolume: "", } if _, ok := validTypes[resume]; !ok { return fmt.Errorf("invalid ResumePolicyType %s", resume) diff --git a/test/scripts/v1beta1/run-never-resume.sh b/test/scripts/v1beta1/run-never-resume.sh index 797d13c169e..99e42e3d820 100755 --- a/test/scripts/v1beta1/run-never-resume.sh +++ b/test/scripts/v1beta1/run-never-resume.sh @@ -54,11 +54,11 @@ cd ${GO_DIR}/test/e2e/v1beta1 echo "Running e2e test for never resume experiment" export KUBECONFIG=$HOME/.kube/config -./run-e2e-experiment ../../../examples/v1beta1/never-resume-example.yaml +./run-e2e-experiment ../../../examples/v1beta1/resume-experiment/never-resume.yaml -kubectl -n kubeflow describe suggestion never-resume-example +kubectl -n kubeflow describe suggestion never-resume -kubectl -n kubeflow describe experiment never-resume-example -kubectl -n kubeflow delete experiment never-resume-example +kubectl -n kubeflow describe experiment never-resume +kubectl -n kubeflow delete experiment never-resume exit 0