Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume Experiment from Volume #1275

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions examples/v1beta1/resume-experiment/from-volume-resume.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
apiVersion: "kubeflow.org/v1beta1"
kind: Experiment
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: from-volume-resume
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: Validation-accuracy
additionalMetricNames:
- Train-accuracy
algorithm:
algorithmName: random
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
resumePolicy: FromVolume
parameters:
- name: lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.03"
- name: num-layers
parameterType: int
feasibleSpace:
min: "2"
max: "5"
- name: optimizer
parameterType: categorical
feasibleSpace:
list:
- sgd
- adam
- ftrl
trialTemplate:
trialParameters:
- name: learningRate
description: Learning rate for the training model
reference: lr
- name: numberLayers
description: Number of training model layers
reference: num-layers
- name: optimizer
description: Training model optimizer (sdg, adam or ftrl)
reference: optimizer
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: docker.io/kubeflowkatib/mxnet-mnist
command:
- "python3"
- "/opt/mxnet-mnist/mnist.py"
- "--batch-size=64"
- "--lr=${trialParameters.learningRate}"
- "--num-layers=${trialParameters.numberLayers}"
- "--optimizer=${trialParameters.optimizer}"
restartPolicy: Never
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: never-resume-example
name: never-resume
spec:
objective:
type: maximize
Expand Down
2 changes: 2 additions & 0 deletions manifests/v1beta1/katib-controller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ rules:
- secrets
- events
- namespaces
- persistentvolumes
- persistentvolumeclaims
verbs:
- "*"
- apiGroups:
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/controller/experiments/v1beta1/experiment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,21 @@ const (
ExperimentFailed ExperimentConditionType = "Failed"
)

// ResumePolicyType describes how the experiment should be resumed.
// Only one of the following resume policies may be specified.
// If none of the following policies is specified, the default one is LongRunning.
type ResumePolicyType string

const (
// NeverResume indicates that experiment can't be resumed.
NeverResume ResumePolicyType = "Never"
// LongRunning indicates that experiment's suggestion resources
// (deployment and service) are always running.
LongRunning ResumePolicyType = "LongRunning"
// FromVolume indicates that volume is attached to experiment's
// suggestion. Suggestion data can be retained in the volume.
// When experiment is succeeded suggestion deployment and service are deleted.
FromVolume ResumePolicyType = "FromVolume"
)

type ParameterSpec struct {
Expand Down
8 changes: 7 additions & 1 deletion pkg/apis/controller/suggestions/v1beta1/suggestion_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"

experiment "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1"
)

// SuggestionSpec defines the desired state of Suggestion
// SuggestionSpec defines the desired state of suggestion.
type SuggestionSpec struct {
// Name of the algorithm that suggestion is used.
AlgorithmName string `json:"algorithmName"`
// Number of suggestions requested
Requests int32 `json:"requests,omitempty"`
// Describes resuming policy which usually take effect after experiment terminated.
// Default value is LongRunning.
ResumePolicy experiment.ResumePolicyType `json:"resumePolicy,omitempty"`
}

// SuggestionStatus defines the observed state of Suggestion
Expand Down
30 changes: 23 additions & 7 deletions pkg/apis/controller/suggestions/v1beta1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,34 @@ func (suggestion *Suggestion) MarkSuggestionStatusCreated(reason, message string
suggestion.setCondition(SuggestionCreated, v1.ConditionTrue, reason, message)
}

func (suggestion *Suggestion) MarkSuggestionStatusRunning(reason, message string) {
//suggestion.removeCondition(SuggestionRestarting)
suggestion.setCondition(SuggestionRunning, v1.ConditionTrue, reason, message)
// MarkSuggestionStatusRunning sets suggestion Running status.
func (suggestion *Suggestion) MarkSuggestionStatusRunning(status v1.ConditionStatus, reason, message string) {
// When suggestion is restrating we need to remove succeeded status from suggestion.
// That should happen only when ResumePolicy = FromVolume
suggestion.removeCondition(SuggestionSucceeded)
suggestion.setCondition(SuggestionRunning, status, reason, message)
}

// MarkSuggestionStatusSucceeded sets suggestion Succeeded status to true.
// Suggestion can be succeeded only if ResumeExperiment = Never or ResumeExperiment = FromVolume
func (suggestion *Suggestion) MarkSuggestionStatusSucceeded(reason, message string) {
currentCond := getCondition(suggestion, SuggestionRunning)
if currentCond != nil {
suggestion.setCondition(SuggestionRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message)

// When suggestion is Succeeded suggestion Running status is false
runningCond := getCondition(suggestion, SuggestionRunning)
succeededReason := "Suggestion is succeeded"
if runningCond != nil {
msg := "Suggestion is not running"
suggestion.setCondition(SuggestionRunning, v1.ConditionFalse, succeededReason, msg)
}

// When suggestion is Succeeded suggestion DeploymentReady status is false
deploymentReadyCond := getCondition(suggestion, SuggestionDeploymentReady)
if deploymentReadyCond != nil {
msg := "Deployment is not ready"
suggestion.setCondition(SuggestionDeploymentReady, v1.ConditionFalse, succeededReason, msg)
}
suggestion.setCondition(SuggestionSucceeded, v1.ConditionTrue, reason, message)

suggestion.setCondition(SuggestionSucceeded, v1.ConditionTrue, reason, message)
}

func (suggestion *Suggestion) MarkSuggestionStatusFailed(reason, message string) {
Expand Down
21 changes: 21 additions & 0 deletions pkg/controller.v1beta1/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package consts
import (
"time"

corev1 "k8s.io/api/core/v1"

"github.com/kubeflow/katib/pkg/util/v1beta1/env"
)

Expand Down Expand Up @@ -147,6 +149,25 @@ const (

// UnavailableMetricValue is the value when metric was not reported or metric value can't be converted to float64
UnavailableMetricValue = "unavailable"

// DefaultSuggestionVolumeLocalPathPrefix is the default cluster local path prefix for suggestion volume
// Full local path = /tmp/katib/suggestions/<suggestion-name>-<suggestion-namespace>
DefaultSuggestionVolumeLocalPathPrefix = "/tmp/katib/suggestions/"

// DefaultSuggestionStorageClassName is the default value for suggestion's volume storage class name
DefaultSuggestionStorageClassName = "katib-suggestion"

// DefaultSuggestionVolumeAccessMode is the default value for suggestion's volume access mode
DefaultSuggestionVolumeAccessMode = corev1.ReadWriteOnce

// DefaultSuggestionVolumeStorage is the default value for suggestion's volume storage
DefaultSuggestionVolumeStorage = "1Gi"

// ContainerSuggestionVolumeName is the volume name that mounted on suggestion container
ContainerSuggestionVolumeName = "suggestion-volume"

// DefaultContainerSuggestionVolumeMountPath is the default mount path in suggestion container
DefaultContainerSuggestionVolumeMountPath = "/opt/katib/data"
)

var (
Expand Down
36 changes: 24 additions & 12 deletions pkg/controller.v1beta1/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

if err = addWatch(mgr, c); err != nil {
log.Error(err, "Trial watch failed")
log.Error(err, "addWatch failed")
return err
}

Expand Down Expand Up @@ -187,27 +187,39 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re
}

if instance.IsCompleted() {
// Cleanup suggestion after Experiment is finished.
// If ResumePolicy = Never or ResumePolicy = FromVolume, delete suggestion deployment, service and mark suggestion status succeeded.
if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume || instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume {
err := r.cleanupSuggestionResources(instance)
if err != nil {
logger.Error(err, "cleanupSuggestionResources error")
return reconcile.Result{}, err
}
}
// Check if completed instance is restartable
// Experiment is restartable only if it is in succeeded state by reaching max trials
// And Resume Policy is LongRunning
// Experiment is restartable only if it is in succeeded state by reaching max trials and
// ResumePolicy = LongRunning or ResumePolicy = FromVolume
if util.IsCompletedExperimentRestartable(instance) {
// Check if max trials is reconfigured
if (instance.Spec.MaxTrialCount != nil &&
*instance.Spec.MaxTrialCount != instance.Status.Trials) ||
(instance.Spec.MaxTrialCount == nil && instance.Status.Trials != 0) {
logger.Info("Experiment is restarting")
logger.Info("Experiment is restarting",
"MaxTrialCount", instance.Spec.MaxTrialCount,
"ParallelTrialCount", instance.Spec.ParallelTrialCount,
"MaxFailedTrialCount", instance.Spec.MaxFailedTrialCount)
msg := "Experiment is restarted"
instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
}
} else {
// Terminate Suggestion after Experiment is finished if Resume Policy is Never
if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume {
err := r.terminateSuggestion(instance)
if err != nil {
logger.Error(err, "Terminate Suggestion error")
// If ResumePolicy = FromVolume, suggestion must remove succeeded status and add running status when restarting
if instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume {
err := r.restartSuggestion(instance)
if err != nil {
logger.Error(err, "restartSuggestion error")
return reconcile.Result{}, err
}
}
return reconcile.Result{}, err
}
} else {
// If experiment is completed with no running trials, stop reconcile
if !instance.HasRunningTrials() {
return reconcile.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestCreateExperiment(t *testing.T) {
mgrStopped.Wait()
}()

// Create the Trial object and expect the Reconcile and Deployment to be created
// Create the experiment object and expect the Reconcile and Deployment to be created
err = c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestReconcileExperiment(t *testing.T) {
mgrStopped.Wait()
}()

// Create the Trial object and expect the Reconcile and Deployment to be created
// Create the experiment object and expect the Reconcile and Deployment to be created
err = c.Create(context.TODO(), instance)
// The instance object may not be a valid object because it might be missing some required fields.
// Please modify the instance object by adding required fields and then remove the following if statement.
Expand Down
51 changes: 45 additions & 6 deletions pkg/controller.v1beta1/experiment/experiment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (

"k8s.io/apimachinery/pkg/api/errors"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

experimentsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1"
suggestionsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
suggestionController "github.com/kubeflow/katib/pkg/controller.v1beta1/suggestion"
"github.com/kubeflow/katib/pkg/controller.v1beta1/util"
)

Expand Down Expand Up @@ -103,7 +103,8 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1beta1.Expe
}
}

func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1beta1.Experiment) error {
func (r *ReconcileExperiment) cleanupSuggestionResources(instance *experimentsv1beta1.Experiment) error {
logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
original := &suggestionsv1beta1.Suggestion{}
err := r.Get(context.TODO(),
types.NamespacedName{Namespace: instance.GetNamespace(), Name: instance.GetName()}, original)
Expand All @@ -113,15 +114,53 @@ func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1beta1.E
}
return err
}

// If Suggestion is failed or Suggestion is Succeeded, not needed to terminate Suggestion
if original.IsFailed() || original.IsSucceeded() {
return nil
}
log.Info("Start terminating suggestion")

logger.Info("Start cleanup suggestion resources")
suggestion := original.DeepCopy()

reason := "Experiment is succeeded"
// If ResumePolicy = Never, mark suggestion status succeeded, can't be restarted
if instance.Spec.ResumePolicy == experimentsv1beta1.NeverResume {
msg := "Suggestion is succeeded, can't be restarted"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we add restart message when suggestion is terminated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for additional information after Suggestion is succeeded for the user.
@johnugeorge You think it is not necessary?

suggestion.MarkSuggestionStatusSucceeded(reason, msg)
logger.Info("Mark suggestion succeeded, can't be restarted")

// If ResumePolicy = FromVolume, mark suggestion status succeeded, can be restarted
} else if instance.Spec.ResumePolicy == experimentsv1beta1.FromVolume {
msg := "Suggestion is succeeded, suggestion volume is not deleted, can be restarted"
suggestion.MarkSuggestionStatusSucceeded(reason, msg)
logger.Info("Mark suggestion succeeded, can be restarted")
}

if err := r.UpdateSuggestionStatus(suggestion); err != nil {
return err
}
return nil
}

func (r *ReconcileExperiment) restartSuggestion(instance *experimentsv1beta1.Experiment) error {
logger := log.WithValues("Suggestion", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()})
original := &suggestionsv1beta1.Suggestion{}
err := r.Get(context.TODO(),
types.NamespacedName{Namespace: instance.GetNamespace(), Name: instance.GetName()}, original)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}

logger.Info("Suggestion is restarting, suggestion Running status is false")
suggestion := original.DeepCopy()
msg := "Suggestion is succeeded"
suggestion.MarkSuggestionStatusSucceeded(suggestionController.SuggestionSucceededReason, msg)
log.Info("Mark suggestion succeeded")
reason := "Experiment is restarting"
msg := "Suggestion is not running"
// Mark suggestion status not running because experiment is restarting and suggestion deployment is not ready
suggestion.MarkSuggestionStatusRunning(corev1.ConditionFalse, reason, msg)

if err := r.UpdateSuggestionStatus(suggestion); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/controller.v1beta1/experiment/suggestion/suggestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (g *General) createSuggestion(instance *experimentsv1beta1.Experiment, sugg
Spec: suggestionsv1beta1.SuggestionSpec{
AlgorithmName: instance.Spec.Algorithm.AlgorithmName,
Requests: suggestionRequests,
ResumePolicy: instance.Spec.ResumePolicy,
},
}

Expand Down
Loading