Skip to content

Commit

Permalink
refactor: log server config
Browse files Browse the repository at this point in the history
Signed-off-by: Han Xiao <han.xiao@jina.ai>
  • Loading branch information
hanxiao committed Apr 9, 2020
1 parent edaf115 commit 301b41a
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 42 deletions.
1 change: 1 addition & 0 deletions jina/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
JINA_GLOBAL.imported.drivers = False
JINA_GLOBAL.stack = SimpleNamespace()
JINA_GLOBAL.stack.id = random.randint(0, 10000)
JINA_GLOBAL.logserver = SimpleNamespace()


def import_classes(namespace: str, targets=None,
Expand Down
40 changes: 20 additions & 20 deletions jina/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import ruamel.yaml
from ruamel.yaml import StringIO

from .. import __default_host__
from .. import JINA_GLOBAL
from ..enums import FlowBuildLevel, FlowOptimizeLevel
from ..excepts import FlowTopologyError, FlowMissingPodError, FlowBuildLevelError, FlowConnectivityError
from ..helper import yaml, expand_env_var, kwargs2list, get_non_defaults_args
Expand Down Expand Up @@ -89,8 +89,6 @@ def __init__(self, args: 'argparse.Namespace' = None, **kwargs):

self.args = args
self._common_kwargs = kwargs
self.host_sse = __default_host__

self._pod_nodes = OrderedDict() # type: Dict[str, 'FlowPod']
self._build_level = FlowBuildLevel.EMPTY
self._pod_name_counter = 0
Expand Down Expand Up @@ -154,7 +152,8 @@ def save_config(self, filename: str = None) -> bool:
self.logger.info(f'{self}\'s yaml config is save to %s' % f)
return True

def _get_yaml_config(self):
@property
def yaml_spec(self):
yaml.register_class(Flow)
stream = StringIO()
yaml.dump(self, stream)
Expand Down Expand Up @@ -428,32 +427,33 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

@build_required(FlowBuildLevel.GRAPH)
def start(self):
"""Start to run all Pods in this Flow.
Remember to close the Flow with :meth:`close`.
Note that this method has a timeout of ``timeout_ready`` set in CLI,
which is inherited all the way from :class:`jina.peapods.peas.BasePea`
"""
def start_log_server(self):
try:
import flask, flask_cors
self.sse_logger = threading.Thread(name='sentinel-sse-logger',
target=start_sse_logger, daemon=True,
args=(self.host_sse,
self.args.port_sse,
self.args.log_endpoint,
self.args.yaml_endpoint,
''))
args=(self.args.logserver_config,
self.yaml_spec))
self.sse_logger.start()
time.sleep(1)

except ModuleNotFoundError:
self.logger.error(
f'sse logger can not start and being disabled because of flask and flask_cors are missing, '
f'sse log-server can not start because of "flask" and "flask_cors" are missing, '
f'use "pip install jina[http]" to install the dependencies')

@build_required(FlowBuildLevel.GRAPH)
def start(self):
"""Start to run all Pods in this Flow.
Remember to close the Flow with :meth:`close`.
Note that this method has a timeout of ``timeout_ready`` set in CLI,
which is inherited all the way from :class:`jina.peapods.peas.BasePea`
"""
if self.args.logserver:
self.start_log_server()

self._pod_stack = ExitStack()
for v in self._pod_nodes.values():
self._pod_stack.enter_context(v)
Expand Down Expand Up @@ -481,7 +481,7 @@ def close(self):
if hasattr(self, '_pod_stack'):
self._pod_stack.close()
if hasattr(self, 'sse_logger') and self.sse_logger.is_alive():
requests.get(f'http://{self.host_sse}:{self.args.port_sse}/shutdown')
requests.get(JINA_GLOBAL.logserver.shutdown)
self.sse_logger.join()
self._build_level = FlowBuildLevel.EMPTY
# time.sleep(1) # sleep for a while until all resources are safely closed
Expand Down
37 changes: 22 additions & 15 deletions jina/logging/sse.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import logging

from jina.logging import default_logger

from . import default_logger
from .queue import __sse_queue__, __profile_queue__
from .. import JINA_GLOBAL
from ..helper import yaml


def start_sse_logger(host: str, port: int,
log_endpoint: str, yaml_endpoint: str,
yaml_flow: str):
def start_sse_logger(server_config_path: str, flow_yaml: str = None):
"""Start a logger that emits server-side event from the log queue, so that one can use a browser to monitor the logs
:param host: host address of the server
:param port: port of the server
:param log_endpoint: endpoint for the log
:param yaml_endpoint: endpoint for the yaml
:param endpoint_log: endpoint for the log
:param endpoint_yaml: endpoint for the yaml
Example:
Expand All @@ -38,42 +37,50 @@ def start_sse_logger(host: str, port: int,
'they are required for serving HTTP requests.'
'Please use "pip install jina[flask]" to install it.')

with open(server_config_path) as fp:
_config = yaml.load(fp)
JINA_GLOBAL.logserver.address = f'http://{_config["host"]}:{_config["port"]}'

JINA_GLOBAL.logserver.ready = '/'.join([JINA_GLOBAL.logserver.address, _config['endpoints']['ready']])
JINA_GLOBAL.logserver.shutdown = '/'.join([JINA_GLOBAL.logserver.address, _config['endpoints']['shutdown']])

app = Flask(__name__)
CORS(app)

@app.route(log_endpoint)
@app.route(_config['endpoints']['log'])
def get_log():
"""Get the logs, endpoint `/log/stream` """
return Response(_log_stream(), mimetype="text/event-stream")

@app.route(yaml_endpoint)
@app.route(_config['endpoints']['yaml'])
def get_yaml():
"""Get the yaml of the flow """
return yaml_flow
return flow_yaml

@app.route('/stream/profile')
@app.route(_config['endpoints']['profile'])
def get_profile():
"""Get the profile logs, endpoint `/profile/stream` """
return Response(_profile_stream(), mimetype="text/event-stream")
return Response(_profile_stream(), mimetype='text/event-stream')

@app.route('/shutdown')
@app.route(_config['endpoints']['shutdown'])
def shutdown():
from flask import request
if not 'werkzeug.server.shutdown' in request.environ:
raise RuntimeError('Not running the development server')
request.environ['werkzeug.server.shutdown']()
return 'Server shutting down...'

@app.route('/is_ready')
@app.route(_config['endpoints']['ready'])
def is_ready():
return Response(status=200)

# os.environ['WERKZEUG_RUN_MAIN'] = 'true'
log = logging.getLogger('werkzeug')
log.disabled = True

try:
app.logger.disabled = True
app.run(port=port, host=host)
app.run(port=_config['port'], host=_config['host'])
except Exception as ex:
default_logger.error(ex)

Expand Down
15 changes: 8 additions & 7 deletions jina/main/parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse

from ..helper import colored, random_port
from ..helper import colored


def add_arg_group(parser, title):
Expand Down Expand Up @@ -76,12 +76,13 @@ def set_flow_parser(parser=None):

gp = add_arg_group(parser, 'flow arguments')
gp.add_argument('--yaml-path', type=str, help='a yaml file represents a flow')
gp.add_argument('--port-sse', type=int, default=random_port(),
help='the port number for sse logging, default a random port between [49152, 65535]')
gp.add_argument('--log-endpoint', type=str, default='/log/stream',
help='endpoint for real-time logging')
gp.add_argument('--yaml-endpoint', type=str, default='/yaml',
help='endpoint for real-time logging')
gp.add_argument('--logserver', action='store_true', default=False,
help='start a log server for the dashboard')
from pkg_resources import resource_filename
gp.add_argument('--logserver-config', type=str,
default=resource_filename('jina',
'/'.join(('resources', 'logserver.default.yml'))),
help='the yaml config of the log server')
gp.add_argument('--optimize-level', type=FlowOptimizeLevel.from_string, default=FlowOptimizeLevel.NONE,
help='removing redundant routers from the flow. Note, this may change the gateway zmq socket to BIND \
and hence not allow multiple clients connected to the gateway at the same time.')
Expand Down
10 changes: 10 additions & 0 deletions jina/resources/logserver.default.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
host: 0.0.0.0
port: 5000
endpoints:
log: /stream/log # fetching log in SSE stream
profile: /stream/profile # fetching profiling log in SSE stream
yaml: /data/yaml # get the YAML spec of a flow
shutdown: /action/shutdown # shutdown the log server
ready: /action/isready # tell if the log server is ready, return 200 if yes


0 comments on commit 301b41a

Please sign in to comment.