Skip to content

Commit

Permalink
Resume Experiment from Volume (#1275)
Browse files Browse the repository at this point in the history
* Resume experiment from the PV

* Add comment

* Remove old api comments

* Change reason for Running suggestion

* Fix few comments

* Rename volume name like suggestion deployment

* Add corev1 to const
  • Loading branch information
andreyvelich authored Jul 27, 2020
1 parent 27658a7 commit 9a7d43c
Show file tree
Hide file tree
Showing 19 changed files with 415 additions and 37 deletions.
66 changes: 66 additions & 0 deletions examples/v1beta1/resume-experiment/from-volume-resume.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
apiVersion: "kubeflow.org/v1beta1"
kind: Experiment
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: from-volume-resume
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: Validation-accuracy
additionalMetricNames:
- Train-accuracy
algorithm:
algorithmName: random
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
resumePolicy: FromVolume
parameters:
- name: lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.03"
- name: num-layers
parameterType: int
feasibleSpace:
min: "2"
max: "5"
- name: optimizer
parameterType: categorical
feasibleSpace:
list:
- sgd
- adam
- ftrl
trialTemplate:
trialParameters:
- name: learningRate
description: Learning rate for the training model
reference: lr
- name: numberLayers
description: Number of training model layers
reference: num-layers
- name: optimizer
description: Training model optimizer (sdg, adam or ftrl)
reference: optimizer
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: docker.io/kubeflowkatib/mxnet-mnist
command:
- "python3"
- "/opt/mxnet-mnist/mnist.py"
- "--batch-size=64"
- "--lr=${trialParameters.learningRate}"
- "--num-layers=${trialParameters.numberLayers}"
- "--optimizer=${trialParameters.optimizer}"
restartPolicy: Never
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: never-resume-example
name: never-resume
spec:
objective:
type: maximize
Expand Down
2 changes: 2 additions & 0 deletions manifests/v1beta1/katib-controller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ rules:
- secrets
- events
- namespaces
- persistentvolumes
- persistentvolumeclaims
verbs:
- "*"
- apiGroups:
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/controller/experiments/v1beta1/experiment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,21 @@ const (
ExperimentFailed ExperimentConditionType = "Failed"
)

// ResumePolicyType describes how the experiment should be resumed.
// Only one of the following resume policies may be specified.
// If none of the following policies is specified, the default one is LongRunning.
type ResumePolicyType string

const (
// NeverResume indicates that experiment can't be resumed.
NeverResume ResumePolicyType = "Never"
// LongRunning indicates that experiment's suggestion resources
// (deployment and service) are always running.
LongRunning ResumePolicyType = "LongRunning"
// FromVolume indicates that volume is attached to experiment's
// suggestion. Suggestion data can be retained in the volume.
// When experiment is succeeded suggestion deployment and service are deleted.
FromVolume ResumePolicyType = "FromVolume"
)

type ParameterSpec struct {
Expand Down
8 changes: 7 additions & 1 deletion pkg/apis/controller/suggestions/v1beta1/suggestion_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"

experiment "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1"
)

// SuggestionSpec defines the desired state of Suggestion
// SuggestionSpec defines the desired state of suggestion.
type SuggestionSpec struct {
// Name of the algorithm that suggestion is used.
AlgorithmName string `json:"algorithmName"`
// Number of suggestions requested
Requests int32 `json:"requests,omitempty"`
// Describes resuming policy which usually take effect after experiment terminated.
// Default value is LongRunning.
ResumePolicy experiment.ResumePolicyType `json:"resumePolicy,omitempty"`
}

// SuggestionStatus defines the observed state of Suggestion
Expand Down
30 changes: 23 additions & 7 deletions pkg/apis/controller/suggestions/v1beta1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,34 @@ func (suggestion *Suggestion) MarkSuggestionStatusCreated(reason, message string
suggestion.setCondition(SuggestionCreated, v1.ConditionTrue, reason, message)
}

func (suggestion *Suggestion) MarkSuggestionStatusRunning(reason, message string) {
//suggestion.removeCondition(SuggestionRestarting)
suggestion.setCondition(SuggestionRunning, v1.ConditionTrue, reason, message)
// MarkSuggestionStatusRunning sets suggestion Running status.
func (suggestion *Suggestion) MarkSuggestionStatusRunning(status v1.ConditionStatus, reason, message string) {
// When suggestion is restrating we need to remove succeeded status from suggestion.
// That should happen only when ResumePolicy = FromVolume
suggestion.removeCondition(SuggestionSucceeded)
suggestion.setCondition(SuggestionRunning, status, reason, message)
}

// MarkSuggestionStatusSucceeded sets suggestion Succeeded status to true.
// Suggestion can be succeeded only if ResumeExperiment = Never or ResumeExperiment = FromVolume
func (suggestion *Suggestion) MarkSuggestionStatusSucceeded(reason, message string) {
currentCond := getCondition(suggestion, SuggestionRunning)
if currentCond != nil {
suggestion.setCondition(SuggestionRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message)

// When suggestion is Succeeded suggestion Running status is false
runningCond := getCondition(suggestion, SuggestionRunning)
succeededReason := "Suggestion is succeeded"
if runningCond != nil {
msg := "Suggestion is not running"
suggestion.setCondition(SuggestionRunning, v1.ConditionFalse, succeededReason, msg)
}

// When suggestion is Succeeded suggestion DeploymentReady status is false
deploymentReadyCond := getCondition(suggestion, SuggestionDeploymentReady)
if deploymentReadyCond != nil {
msg := "Deployment is not ready"
suggestion.setCondition(SuggestionDeploymentReady, v1.ConditionFalse, succeededReason, msg)
}
suggestion.setCondition(SuggestionSucceeded, v1.ConditionTrue, reason, message)

suggestion.setCondition(SuggestionSucceeded, v1.ConditionTrue, reason, message)
}

func (suggestion *Suggestion) MarkSuggestionStatusFailed(reason, message string) {
Expand Down
21 changes: 21 additions & 0 deletions pkg/controller.v1beta1/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package consts
import (
"time"

corev1 "k8s.io/api/core/v1"

"github.com/kubeflow/katib/pkg/util/v1beta1/env"
)

Expand Down Expand Up @@ -147,6 +149,25 @@ const (

// UnavailableMetricValue is the value when metric was not reported or metric value can't be converted to float64
UnavailableMetricValue = "unavailable"

// DefaultSuggestionVolumeLocalPathPrefix is the default cluster local path prefix for suggestion volume
// Full local path = /tmp/katib/suggestions/<suggestion-name>-<suggestion-namespace>
DefaultSuggestionVolumeLocalPathPrefix = "/tmp/katib/suggestions/"

// DefaultSuggestionStorageClassName is the default value for suggestion's volume storage class name
DefaultSuggestionStorageClassName = "katib-suggestion"

// DefaultSuggestionVolumeAccessMode is the default value for suggestion's volume access mode
DefaultSuggestionVolumeAccessMode = corev1.ReadWriteOnce

// DefaultSuggestionVolumeStorage is the default value for suggestion's volume storage
DefaultSuggestionVolumeStorage = "1Gi"

// ContainerSuggestionVolumeName is the volume name that mounted on suggestion container
ContainerSuggestionVolumeName = "suggestion-volume"

// DefaultContainerSuggestionVolumeMountPath is the default mount path in suggestion container
DefaultContainerSuggestionVolumeMountPath = "/opt/katib/data"
)

var (
Expand Down
36 changes: 24 additions & 12 deletions pkg/controller.v1beta1/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

if err = addWatch(mgr, c); err != nil {
log.Error(err, "Trial watch failed")
log.Error(err, "addWatch failed")
return err
}

Expand Down Expand Up @@ -187,27 +187,39 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re
}

if instance.IsCompleted() {
// Cleanup suggestion after Experiment is finished.
// If ResumePolicy = Never or ResumePolicy = FromVolume, delete suggestion deployment, service and mark suggestion status succeeded.
if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume || instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume {
err := r.cleanupSuggestionResources(instance)
if err != nil {
logger.Error(err, "cleanupSuggestionResources error")
return reconcile.Result{}, err
}
}
// Check if completed instance is restartable
// Experiment is restartable only if it is in succeeded state by reaching max trials
// And Resume Policy is LongRunning
// Experiment is restartable only if it is in succeeded state by reaching max trials and
// ResumePolicy = LongRunning or ResumePolicy = FromVolume
if util.IsCompletedExperimentRestartable(instance) {
// Check if max trials is reconfigured
if (instance.Spec.MaxTrialCount != nil &&
*instance.Spec.MaxTrialCount != instance.Status.Trials) ||
(instance.Spec.MaxTrialCount == nil && instance.Status.Trials != 0) {
logger.Info("Experiment is restarting")
logger.Info("Experiment is restarting",
"MaxTrialCount", instance.Spec.MaxTrialCount,
"ParallelTrialCount", instance.Spec.ParallelTrialCount,
"MaxFailedTrialCount", instance.Spec.MaxFailedTrialCount)
msg := "Experiment is restarted"
instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
}
} else {
// Terminate Suggestion after Experiment is finished if Resume Policy is Never
if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume {
err := r.terminateSuggestion(instance)
if err != nil {
logger.Error(err, "Terminate Suggestion error")
// If ResumePolicy = FromVolume, suggestion must remove succeeded status and add running status when restarting
if instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume {
err := r.restartSuggestion(instance)
if err != nil {
logger.Error(err, "restartSuggestion error")
return reconcile.Result{}, err
}
}
return reconcile.Result{}, err
}
} else {
// If experiment is completed with no running trials, stop reconcile
if !instance.HasRunningTrials() {
return reconcile.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestCreateExperiment(t *testing.T) {
mgrStopped.Wait()
}()

// Create the Trial object and expect the Reconcile and Deployment to be created
// Create the experiment object and expect the Reconcile and Deployment to be created
err = c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestReconcileExperiment(t *testing.T) {
mgrStopped.Wait()
}()

// Create the Trial object and expect the Reconcile and Deployment to be created
// Create the experiment object and expect the Reconcile and Deployment to be created
err = c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
Expand Down
51 changes: 45 additions & 6 deletions pkg/controller.v1beta1/experiment/experiment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (

"k8s.io/apimachinery/pkg/api/errors"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1"
suggestionsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
suggestionController "github.com/kubeflow/katib/pkg/controller.v1beta1/suggestion"
"github.com/kubeflow/katib/pkg/controller.v1beta1/util"
)

Expand Down Expand Up @@ -103,7 +103,8 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1beta1.Expe
}
}

func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1beta1.Experiment) error {
func (r *ReconcileExperiment) cleanupSuggestionResources(instance *experimentsv1beta1.Experiment) error {
logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
original := &suggestionsv1beta1.Suggestion{}
err := r.Get(context.TODO(),
types.NamespacedName{Namespace: instance.GetNamespace(), Name: instance.GetName()}, original)
Expand All @@ -113,15 +114,53 @@ func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1beta1.E
}
return err
}

// If Suggestion is failed or Suggestion is Succeeded, not needed to terminate Suggestion
if original.IsFailed() || original.IsSucceeded() {
return nil
}
log.Info("Start terminating suggestion")

logger.Info("Start cleanup suggestion resources")
suggestion := original.DeepCopy()

reason := "Experiment is succeeded"
// If ResumePolicy = Never, mark suggestion status succeeded, can't be restarted
if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume {
msg := "Suggestion is succeeded, can't be restarted"
suggestion.MarkSuggestionStatusSucceeded(reason, msg)
logger.Info("Mark suggestion succeeded, can't be restarted")

// If ResumePolicy = FromVolume, mark suggestion status succeeded, can be restarted
} else if instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume {
msg := "Suggestion is succeeded, suggestion volume is not deleted, can be restarted"
suggestion.MarkSuggestionStatusSucceeded(reason, msg)
logger.Info("Mark suggestion succeeded, can be restarted")
}

if err := r.UpdateSuggestionStatus(suggestion); err != nil {
return err
}
return nil
}

func (r *ReconcileExperiment) restartSuggestion(instance *experimentsv1beta1.Experiment) error {
logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
original := &suggestionsv1beta1.Suggestion{}
err := r.Get(context.TODO(),
types.NamespacedName{Namespace: instance.GetNamespace(), Name: instance.GetName()}, original)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}

logger.Info("Suggestion is restarting, suggestion Running status is false")
suggestion := original.DeepCopy()
msg := "Suggestion is succeeded"
suggestion.MarkSuggestionStatusSucceeded(suggestionController.SuggestionSucceededReason, msg)
log.Info("Mark suggestion succeeded")
reason := "Experiment is restarting"
msg := "Suggestion is not running"
// Mark suggestion status not running because experiment is restarting and suggestion deployment is not ready
suggestion.MarkSuggestionStatusRunning(corev1.ConditionFalse, reason, msg)

if err := r.UpdateSuggestionStatus(suggestion); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/controller.v1beta1/experiment/suggestion/suggestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (g *General) createSuggestion(instance *experimentsv1beta1.Experiment, sugg
Spec: suggestionsv1beta1.SuggestionSpec{
AlgorithmName: instance.Spec.Algorithm.AlgorithmName,
Requests: suggestionRequests,
ResumePolicy: instance.Spec.ResumePolicy,
},
}

Expand Down
Loading

0 comments on commit 9a7d43c

Please sign in to comment.