Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add bayesian #777

Merged
merged 15 commits into from
Sep 21, 2019
18 changes: 18 additions & 0 deletions cmd/suggestion/skopt/v1alpha3/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3

gaocegege marked this conversation as resolved.
Show resolved Hide resolved
RUN if [ "$(uname -m)" = "ppc64le" ]; then \
apt-get -y update && \
apt-get -y install gfortran libopenblas-dev liblapack-dev && \
pip install cython; \
fi
RUN GRPC_HEALTH_PROBE_VERSION=v0.3.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe

ADD . /usr/src/app/github.com/kubeflow/katib
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/skopt/v1alpha3
RUN pip install --no-cache-dir -r requirements.txt

ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/health/python

ENTRYPOINT ["python", "main.py"]
Empty file.
26 changes: 26 additions & 0 deletions cmd/suggestion/skopt/v1alpha3/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import grpc
import time
from pkg.apis.manager.v1alpha3.python import api_pb2_grpc
from pkg.apis.manager.health.python import health_pb2_grpc
from pkg.suggestion.v1alpha3.skopt_service import SkoptService
from concurrent import futures

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
DEFAULT_PORT = "0.0.0.0:6789"

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
service = SkoptService()
api_pb2_grpc.add_SuggestionServicer_to_server(service, server)
health_pb2_grpc.add_HealthServicer_to_server(service, server)
server.add_insecure_port(DEFAULT_PORT)
print("Listening...")
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)

if __name__ == "__main__":
serve()
10 changes: 10 additions & 0 deletions cmd/suggestion/skopt/v1alpha3/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
grpcio==1.23.0
duecredit===0.7.0
cloudpickle==0.5.6
numpy>=1.13.3
scikit-learn>=0.19.0
scipy>=0.19.1
forestci==0.3
protobuf==3.9.1
googleapis-common-protos==1.6.0
scikit-optimize==0.5.2
65 changes: 65 additions & 0 deletions examples/v1alpha3/skopt-bayesian-optimization-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
apiVersion: "kubeflow.org/v1alpha3"
kind: Experiment
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: skopt-bayesian-optimization-example
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: Validation-accuracy
additionalMetricNames:
- accuracy
algorithm:
algorithmName: skopt-bayesian-optimization
algorithmSettings:
- name: "random_state"
value: "10"
parallelTrialCount: 3
maxTrialCount: 12
maxFailedTrialCount: 3
parameters:
- name: --lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.03"
- name: --num-layers
parameterType: int
feasibleSpace:
min: "2"
max: "5"
- name: --optimizer
parameterType: categorical
feasibleSpace:
list:
- sgd
- adam
- ftrl
trialTemplate:
goTemplate:
rawTemplate: |-
apiVersion: batch/v1
kind: Job
metadata:
name: {{.Trial}}
namespace: {{.NameSpace}}
spec:
template:
spec:
serviceAccountName: metrics-collector # will be dropped
containers:
- name: {{.Trial}}
image: docker.io/katib/mxnet-mnist-example
command:
- "python"
- "/mxnet/example/image-classification/train_mnist.py"
- "--batch-size=64"
{{- with .HyperParameters}}
{{- range .}}
- "{{.Name}}={{.Value}}"
{{- end}}
{{- end}}
restartPolicy: Never
3 changes: 3 additions & 0 deletions manifests/v1alpha3/katib-controller/katib-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ data:
},
"hyperopt-random": {
"image": "gcr.io/kubeflow-images-public/katib/v1alpha3/suggestion-hyperopt"
},
"skopt-bayesian-optimization": {
"image": "gcr.io/kubeflow-images-public/katib/v1alpha3/suggestion-skopt"
}
}
3 changes: 3 additions & 0 deletions pkg/controller.v1alpha3/suggestion/composer/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

const (
defaultInitialDelaySeconds = 10
defaultPeriod = 10
// Ref https://github.com/grpc-ecosystem/grpc-health-probe/
defaultGRPCHealthCheckProbe = "/bin/grpc_health_probe"
)
Expand Down Expand Up @@ -133,6 +134,7 @@ func (g *General) desiredContainer(s *suggestionsv1alpha3.Suggestion) (*corev1.C
},
},
InitialDelaySeconds: defaultInitialDelaySeconds,
PeriodSeconds: defaultPeriod,
}
c.LivenessProbe = &corev1.Probe{
Handler: corev1.Handler{
Expand All @@ -145,6 +147,7 @@ func (g *General) desiredContainer(s *suggestionsv1alpha3.Suggestion) (*corev1.C
},
},
InitialDelaySeconds: defaultInitialDelaySeconds,
PeriodSeconds: defaultPeriod,
}
return c, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func (r *ReconcileSuggestion) ReconcileSuggestion(instance *suggestionsv1alpha3.
}
msg := "Suggestion is running"
instance.MarkSuggestionStatusRunning(SuggestionRunningReason, msg)
return nil
}
logger.Info("Sync assignments", "suggestions", instance.Spec.Requests)
if err = r.SyncAssignments(instance, experiment, trials.Items); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
)

var log = logf.Log.WithName("suggestion-client")
var (
log = logf.Log.WithName("suggestion-client")
timeout = 60 * time.Second
)

type SuggestionClient interface {
SyncAssignments(instance *suggestionsv1alpha3.Suggestion, e *experimentsv1alpha3.Experiment,
Expand Down Expand Up @@ -54,7 +57,7 @@ func (g *General) SyncAssignments(
defer conn.Close()

client := suggestionapi.NewSuggestionClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

request := &suggestionapi.GetSuggestionsRequest{
Expand Down Expand Up @@ -94,7 +97,7 @@ func (g *General) ValidateAlgorithmSettings(instance *suggestionsv1alpha3.Sugges
defer conn.Close()

client := suggestionapi.NewSuggestionClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

request := &suggestionapi.ValidateAlgorithmSettingsRequest{
Expand Down
97 changes: 97 additions & 0 deletions pkg/suggestion/v1alpha3/skopt/base_skopt_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import json
import numpy as np
import skopt
import logging

from pkg.suggestion.v1alpha3.internal.search_space import *
from pkg.suggestion.v1alpha3.internal.trial import *

logger = logging.getLogger("BaseSkoptService")


class BaseSkoptService(object):
"""
Refer to https://github.com/scikit-optimize/scikit-optimize .
"""

def __init__(self, algorithm_name="skopt-bayesian-optimization",
base_estimator="GP",
n_initial_points=10,
acq_func="gp_hedge",
acq_optimizer="auto",
random_state=None):
self.base_estimator = base_estimator
self.n_initial_points = n_initial_points
self.acq_func = acq_func
self.acq_optimizer = acq_optimizer
self.random_state = random_state
self.algorithm_name = algorithm_name

def getSuggestions(self, search_space, trials, request_number):
"""
Get the new suggested trials with skopt algorithm.
"""

skopt_search_space = []

for param in search_space.params:
if param.type == INTEGER:
skopt_search_space.append(skopt.space.Integer(
int(param.min), int(param.max), name=param.name))
elif param.type == DOUBLE:
skopt_search_space.append(skopt.space.Real(
float(param.min), float(param.max), "log-uniform", name=param.name))
elif param.type == CATEGORICAL or param.type == DISCRETE:
skopt_search_space.append(
skopt.space.Categorical(param.list, name=param.name))

if self.algorithm_name != "skopt-bayesian-optimization":
raise Exception("Algorithm name is not supported by skopt service.")
skopt_optimizer = skopt.Optimizer(skopt_search_space,
base_estimator=self.base_estimator,
n_initial_points=self.n_initial_points,
acq_func=self.acq_func,
acq_optimizer=self.acq_optimizer,
random_state=self.random_state)

for trial in trials:
skopt_suggested = []
for param in search_space.params:
parameter_value = None
for assignment in trial.assignments:
if assignment.name == param.name:
parameter_value = assignment.value
break
if param.type == INTEGER:
skopt_suggested.append(int(parameter_value))
elif param.type == DOUBLE:
skopt_suggested.append(float(parameter_value))
else:
skopt_suggested.append(parameter_value)

loss_for_skopt = float(trial.target_metric.value)
if search_space.goal == MAX_GOAL:
loss_for_skopt = -1 * loss_for_skopt

skopt_optimizer.tell(skopt_suggested, loss_for_skopt)

return_trial_list = []

for i in range(request_number):
skopt_suggested = skopt_optimizer.ask()
return_trial_list.append(
BaseSkoptService.convert(search_space, skopt_suggested))
return return_trial_list

@staticmethod
def convert(search_space, skopt_suggested):
assignments = []
for i in range(len(search_space.params)):
param = search_space.params[i]
if param.type == INTEGER:
assignments.append(Assignment(param.name, skopt_suggested[i]))
elif param.type == DOUBLE:
assignments.append(Assignment(param.name, skopt_suggested[i]))
elif param.type == CATEGORICAL or param.type == DISCRETE:
assignments.append(Assignment(param.name, skopt_suggested[i]))
return assignments
81 changes: 81 additions & 0 deletions pkg/suggestion/v1alpha3/skopt_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging

from pkg.apis.manager.v1alpha3.python import api_pb2
from pkg.apis.manager.v1alpha3.python import api_pb2_grpc
from pkg.suggestion.v1alpha3.internal.search_space import HyperParameter, HyperParameterSearchSpace
from pkg.suggestion.v1alpha3.internal.trial import Trial, Assignment
from pkg.suggestion.v1alpha3.skopt.base_skopt_service import BaseSkoptService
from pkg.suggestion.v1alpha3.base_health_service import HealthServicer


logger = logging.getLogger("SkoptService")


class SkoptService(
api_pb2_grpc.SuggestionServicer, HealthServicer):
def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
"""
name, config = OptimizerConfiguration.convertAlgorithmSpec(
request.experiment.spec.algorithm)
base_service = BaseSkoptService(
algorithm_name=name,
base_estimator=config.base_estimator,
n_initial_points=config.n_initial_points,
acq_func=config.acq_func,
acq_optimizer=config.acq_optimizer,
random_state=config.random_state)
search_space = HyperParameterSearchSpace.convert(request.experiment)
trials = Trial.convert(request.trials)
new_trials = base_service.getSuggestions(
search_space, trials, request.request_number)
return api_pb2.GetSuggestionsReply(
parameter_assignments=Assignment.generate(new_trials)
)


class OptimizerConfiguration(object):
def __init__(self, base_estimator="GP",
n_initial_points=10,
acq_func="gp_hedge",
acq_optimizer="auto",
random_state=None):
self.base_estimator = base_estimator
self.n_initial_points = n_initial_points
self.acq_func = acq_func
self.acq_optimizer = acq_optimizer
self.random_state = random_state

@staticmethod
def convertAlgorithmSpec(algorithm_spec):
optmizer = OptimizerConfiguration()
for s in algorithm_spec.algorithm_setting:
if s.name == "base_estimator":
optmizer.base_estimator = s.value
elif s.name == "n_initial_points":
optmizer.n_initial_points = int(s.value)
elif s.name == "acq_func":
optmizer.acq_func = s.value
elif s.name == "acq_optimizer":
optmizer.acq_optimizer = s.value
elif s.name == "random_state":
optmizer.random_state = int(s.value)
return algorithm_spec.algorithm_name, optmizer


class HyperoptService(
api_pb2_grpc.SuggestionServicer):
def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
"""
base_serice = BaseHyperoptService(
algorithm_name=request.experiment.spec.algorithm.algorithm_name)
search_space = HyperParameterSearchSpace.convert(request.experiment)
trials = Trial.convert(request.trials)
new_assignments = base_serice.getSuggestions(
search_space, trials, request.request_number)
return api_pb2.GetSuggestionsReply(
parameter_assignments=Assignment.generate(new_assignments)
)
7 changes: 4 additions & 3 deletions scripts/v1alpha3/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ SCRIPT_ROOT=$(dirname ${BASH_SOURCE})/../..

cd ${SCRIPT_ROOT}

echo "Building suggestion images..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/suggestion-hyperopt -f ${CMD_PREFIX}/suggestion/hyperopt/v1alpha3/Dockerfile .

echo "Building core image..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/katib-controller -f ${CMD_PREFIX}/katib-controller/v1alpha3/Dockerfile .
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/katib-manager -f ${CMD_PREFIX}/manager/v1alpha3/Dockerfile .
Expand All @@ -42,3 +39,7 @@ docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/file-metrics-collector -f ${CMD_P

echo "Building TF Event metrics collector image..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/tfevent-metrics-collector -f ${CMD_PREFIX}/metricscollector/v1alpha3/tfevent-metricscollector/Dockerfile .

echo "Building suggestion images..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/suggestion-hyperopt -f ${CMD_PREFIX}/suggestion/hyperopt/v1alpha3/Dockerfile .
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/suggestion-skopt -f ${CMD_PREFIX}/suggestion/skopt/v1alpha3/Dockerfile .
Loading