From 173992da8ec7eb001385bef1c65deda16de7b33e Mon Sep 17 00:00:00 2001 From: Ce Gao Date: Fri, 20 Sep 2019 10:53:56 +0800 Subject: [PATCH 1/2] feat: Add health check in suggestions Signed-off-by: Ce Gao --- cmd/suggestion/hyperopt/v1alpha3/Dockerfile | 11 +- cmd/suggestion/hyperopt/v1alpha3/main.py | 5 +- pkg/controller.v1alpha3/consts/const.go | 4 +- .../suggestion/composer/composer.go | 39 +++- .../v1alpha3/base_health_service.py | 167 ++++++++++++++++++ pkg/suggestion/v1alpha3/hyperopt_service.py | 5 +- scripts/v1alpha3/build.sh | 6 +- 7 files changed, 227 insertions(+), 10 deletions(-) create mode 100644 pkg/suggestion/v1alpha3/base_health_service.py diff --git a/cmd/suggestion/hyperopt/v1alpha3/Dockerfile b/cmd/suggestion/hyperopt/v1alpha3/Dockerfile index 54653dbb138..1913d49f6e7 100644 --- a/cmd/suggestion/hyperopt/v1alpha3/Dockerfile +++ b/cmd/suggestion/hyperopt/v1alpha3/Dockerfile @@ -1,13 +1,18 @@ FROM python:3 -ADD . /usr/src/app/github.com/kubeflow/katib -WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/hyperopt/v1alpha3 RUN if [ "$(uname -m)" = "ppc64le" ]; then \ apt-get -y update && \ apt-get -y install gfortran libopenblas-dev liblapack-dev && \ pip install cython; \ fi +RUN GRPC_HEALTH_PROBE_VERSION=v0.3.0 && \ + wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /bin/grpc_health_probe + +ADD . /usr/src/app/github.com/kubeflow/katib +WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/hyperopt/v1alpha3 RUN pip install --no-cache-dir -r requirements.txt -ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python + +ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/health/python ENTRYPOINT ["python", "main.py"] diff --git a/cmd/suggestion/hyperopt/v1alpha3/main.py b/cmd/suggestion/hyperopt/v1alpha3/main.py index d12ef53f638..6f0869e2845 100644 --- a/cmd/suggestion/hyperopt/v1alpha3/main.py +++ b/cmd/suggestion/hyperopt/v1alpha3/main.py @@ -1,6 +1,7 @@ import grpc import time from pkg.apis.manager.v1alpha3.python import api_pb2_grpc +from pkg.apis.manager.health.python import health_pb2_grpc from pkg.suggestion.v1alpha3.hyperopt_service import HyperoptService from concurrent import futures @@ -9,7 +10,9 @@ def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) - api_pb2_grpc.add_SuggestionServicer_to_server(HyperoptService(), server) + service = HyperoptService() + api_pb2_grpc.add_SuggestionServicer_to_server(service, server) + health_pb2_grpc.add_HealthServicer_to_server(service, server) server.add_insecure_port(DEFAULT_PORT) print("Listening...") server.start() diff --git a/pkg/controller.v1alpha3/consts/const.go b/pkg/controller.v1alpha3/consts/const.go index c8ff53a9794..35be3a0185f 100644 --- a/pkg/controller.v1alpha3/consts/const.go +++ b/pkg/controller.v1alpha3/consts/const.go @@ -12,7 +12,9 @@ const ( ContainerSuggestion = "suggestion" - DefaultSuggestionPort = 6789 + DefaultSuggestionPort = 6789 + DefaultSuggestionPortName = "katib-api" + DefaultGRPCService = "manager.v1alpha3.Suggestion" // Default env name of katib namespace DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE" diff --git a/pkg/controller.v1alpha3/suggestion/composer/composer.go b/pkg/controller.v1alpha3/suggestion/composer/composer.go index a5dfc5bebb8..1d51f4692b9 100644 --- a/pkg/controller.v1alpha3/suggestion/composer/composer.go +++ b/pkg/controller.v1alpha3/suggestion/composer/composer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -19,6 +20,12 @@ import ( "github.com/kubeflow/katib/pkg/controller.v1alpha3/util" ) +const ( + defaultInitialDelaySeconds = 10 + // Ref https://github.com/grpc-ecosystem/grpc-health-probe/ + defaultGRPCHealthCheckProbe = "/bin/grpc_health_probe" +) + var log = logf.Log.WithName("suggestion-composer") type Composer interface { @@ -75,7 +82,7 @@ func (g *General) DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1. func (g *General) DesiredService(s *suggestionsv1alpha3.Suggestion) (*corev1.Service, error) { ports := []corev1.ServicePort{ { - Name: "katib-api", + Name: consts.DefaultSuggestionPortName, Port: consts.DefaultSuggestionPort, }, } @@ -109,6 +116,36 @@ func (g *General) desiredContainer(s *suggestionsv1alpha3.Suggestion) (*corev1.C Name: consts.ContainerSuggestion, } c.Image = suggestionContainerImage + c.Ports = []corev1.ContainerPort{ + { + Name: consts.DefaultSuggestionPortName, + ContainerPort: consts.DefaultSuggestionPort, + }, + } + c.ReadinessProbe = &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{ + defaultGRPCHealthCheckProbe, + fmt.Sprintf("-addr=:%d", consts.DefaultSuggestionPort), + fmt.Sprintf("-service=%s", consts.DefaultGRPCService), + }, + }, + }, + InitialDelaySeconds: defaultInitialDelaySeconds, + } + c.LivenessProbe = &corev1.Probe{ + Handler: corev1.Handler{ + Exec: &corev1.ExecAction{ + Command: []string{ + defaultGRPCHealthCheckProbe, + fmt.Sprintf("-addr=:%d", consts.DefaultSuggestionPort), + fmt.Sprintf("-service=%s", consts.DefaultGRPCService), + }, + }, + }, + InitialDelaySeconds: defaultInitialDelaySeconds, + } return c, nil } diff --git a/pkg/suggestion/v1alpha3/base_health_service.py b/pkg/suggestion/v1alpha3/base_health_service.py new file mode 100644 index 00000000000..6da6787302a --- /dev/null +++ b/pkg/suggestion/v1alpha3/base_health_service.py @@ -0,0 +1,167 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Reference implementation for health checking in gRPC Python.""" + +import collections +import threading + +import grpc + +from pkg.apis.manager.health.python import health_pb2 as _health_pb2 +from pkg.apis.manager.health.python import health_pb2_grpc as _health_pb2_grpc + +SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name + + +class _Watcher(): + + def __init__(self): + self._condition = threading.Condition() + self._responses = collections.deque() + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while not self._responses and self._open: + self._condition.wait() + if self._responses: + return self._responses.popleft() + else: + raise StopIteration() + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def add(self, response): + with self._condition: + self._responses.append(response) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + +def _watcher_to_send_response_callback_adapter(watcher): + + def send_response_callback(response): + if response is None: + watcher.close() + else: + watcher.add(response) + + return send_response_callback + + +class HealthServicer(_health_pb2_grpc.HealthServicer): + """Servicer handling RPCs for service statuses.""" + + def __init__(self, + experimental_non_blocking=True, + experimental_thread_pool=None): + self._lock = threading.RLock() + self._server_status = {} + self._send_response_callbacks = {} + self.Watch.__func__.experimental_non_blocking = experimental_non_blocking + self.Watch.__func__.experimental_thread_pool = experimental_thread_pool + self._gracefully_shutting_down = False + self.set("manager.v1alpha3.Suggestion", _health_pb2.HealthCheckResponse.SERVING) + + def _on_close_callback(self, send_response_callback, service): + + def callback(): + with self._lock: + self._send_response_callbacks[service].remove( + send_response_callback) + send_response_callback(None) + + return callback + + def Check(self, request, context): + with self._lock: + status = self._server_status.get(request.service) + if status is None: + print(request.service) + context.set_code(grpc.StatusCode.NOT_FOUND) + return _health_pb2.HealthCheckResponse() + else: + return _health_pb2.HealthCheckResponse(status=status) + + # pylint: disable=arguments-differ + def Watch(self, request, context, send_response_callback=None): + blocking_watcher = None + if send_response_callback is None: + # The server does not support the experimental_non_blocking + # parameter. For backwards compatibility, return a blocking response + # generator. + blocking_watcher = _Watcher() + send_response_callback = _watcher_to_send_response_callback_adapter( + blocking_watcher) + service = request.service + with self._lock: + status = self._server_status.get(service) + if status is None: + status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member + send_response_callback( + _health_pb2.HealthCheckResponse(status=status)) + if service not in self._send_response_callbacks: + self._send_response_callbacks[service] = set() + self._send_response_callbacks[service].add(send_response_callback) + context.add_callback( + self._on_close_callback(send_response_callback, service)) + return blocking_watcher + + def set(self, service, status): + """Sets the status of a service. + + Args: + service: string, the name of the service. NOTE, '' must be set. + status: HealthCheckResponse.status enum value indicating the status of + the service + """ + with self._lock: + if self._gracefully_shutting_down: + return + else: + self._server_status[service] = status + if service in self._send_response_callbacks: + for send_response_callback in self._send_response_callbacks[ + service]: + send_response_callback( + _health_pb2.HealthCheckResponse(status=status)) + + def enter_graceful_shutdown(self): + """Permanently sets the status of all services to NOT_SERVING. + + This should be invoked when the server is entering a graceful shutdown + period. After this method is invoked, future attempts to set the status + of a service will be ignored. + + This is an EXPERIMENTAL API. + """ + with self._lock: + if self._gracefully_shutting_down: + return + else: + for service in self._server_status: + self.set(service, + _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member + self._gracefully_shutting_down = True diff --git a/pkg/suggestion/v1alpha3/hyperopt_service.py b/pkg/suggestion/v1alpha3/hyperopt_service.py index b6817fe10ae..89ce2950fce 100644 --- a/pkg/suggestion/v1alpha3/hyperopt_service.py +++ b/pkg/suggestion/v1alpha3/hyperopt_service.py @@ -2,15 +2,18 @@ from pkg.apis.manager.v1alpha3.python import api_pb2 from pkg.apis.manager.v1alpha3.python import api_pb2_grpc +from pkg.apis.manager.health.python import health_pb2 + from pkg.suggestion.v1alpha3.internal.search_space import HyperParameter, HyperParameterSearchSpace from pkg.suggestion.v1alpha3.internal.trial import Trial, Assignment from pkg.suggestion.v1alpha3.hyperopt.base_hyperopt_service import BaseHyperoptService +from pkg.suggestion.v1alpha3.base_health_service import HealthServicer logger = logging.getLogger("HyperoptRandomService") class HyperoptService( - api_pb2_grpc.SuggestionServicer): + api_pb2_grpc.SuggestionServicer, HealthServicer): def GetSuggestions(self, request, context): """ Main function to provide suggestion. diff --git a/scripts/v1alpha3/build.sh b/scripts/v1alpha3/build.sh index 1f6b1354ee8..5e9c4663d7c 100755 --- a/scripts/v1alpha3/build.sh +++ b/scripts/v1alpha3/build.sh @@ -26,6 +26,9 @@ SCRIPT_ROOT=$(dirname ${BASH_SOURCE})/../.. cd ${SCRIPT_ROOT} +echo "Building suggestion images..." +docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/suggestion-hyperopt -f ${CMD_PREFIX}/suggestion/hyperopt/v1alpha3/Dockerfile . + echo "Building core image..." docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/katib-controller -f ${CMD_PREFIX}/katib-controller/v1alpha3/Dockerfile . docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/katib-manager -f ${CMD_PREFIX}/manager/v1alpha3/Dockerfile . @@ -39,6 +42,3 @@ docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/file-metrics-collector -f ${CMD_P echo "Building TF Event metrics collector image..." docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/tfevent-metrics-collector -f ${CMD_PREFIX}/metricscollector/v1alpha3/tfevent-metricscollector/Dockerfile . - -echo "Building suggestion images..." -docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/suggestion-hyperopt -f ${CMD_PREFIX}/suggestion/hyperopt/v1alpha3/Dockerfile . From ddd3da9459175389c4698c79b4aae2e3c66bd978 Mon Sep 17 00:00:00 2001 From: Ce Gao Date: Fri, 20 Sep 2019 11:35:55 +0800 Subject: [PATCH 2/2] fix: Fix ut python Signed-off-by: Ce Gao --- test/scripts/v1alpha3/python-tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/scripts/v1alpha3/python-tests.sh b/test/scripts/v1alpha3/python-tests.sh index 3592a242014..b08ba2132ec 100755 --- a/test/scripts/v1alpha3/python-tests.sh +++ b/test/scripts/v1alpha3/python-tests.sh @@ -20,7 +20,7 @@ set -o errexit set -o nounset set -o pipefail -export PYTHONPATH=$(pwd):$(pwd)/pkg/apis/manager/v1alpha3/python +export PYTHONPATH=$(pwd):$(pwd)/pkg/apis/manager/v1alpha3/python:$(pwd)/pkg/apis/manager/health/python pip install -r test/suggestion/v1alpha3/test_requirements.txt pip install -r cmd/suggestion/hyperopt/v1alpha3/requirements.txt pytest -s ./test