Skip to content

Commit

Permalink
Merge pull request #239 from jina-ai/fix-sse-logger-3
Browse files Browse the repository at this point in the history
fix: sse logger shutdown problem
  • Loading branch information
hanxiao authored Apr 13, 2020
2 parents 11d86b4 + 8dc37dd commit 813738b
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 10 deletions.
14 changes: 9 additions & 5 deletions jina/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import requests
import ruamel.yaml
from requests import Timeout
from ruamel.yaml import StringIO

from .. import JINA_GLOBAL
Expand Down Expand Up @@ -443,11 +444,14 @@ def start_log_server(self):
self.yaml_spec))
self.sse_logger.start()
time.sleep(1)

requests.get(JINA_GLOBAL.logserver.ready, timeout=5)
self.logger.success(f'logserver is started and available at {JINA_GLOBAL.logserver.address}')
except ModuleNotFoundError:
self.logger.error(
f'sse log-server can not start because of "flask" and "flask_cors" are missing, '
f'sse logserver can not start because of "flask" and "flask_cors" are missing, '
f'use "pip install jina[http]" to install the dependencies')
except Timeout:
self.logger.error('logserver fails to start')

@build_required(FlowBuildLevel.GRAPH)
def start(self):
Expand All @@ -459,6 +463,7 @@ def start(self):
which is inherited all the way from :class:`jina.peapods.peas.BasePea`
"""
if self.args.logserver:
self.logger.info('start logserver...')
self.start_log_server()

self._pod_stack = ExitStack()
Expand Down Expand Up @@ -487,9 +492,8 @@ def close(self):
"""Close the flow and release all resources associated to it. """
if hasattr(self, '_pod_stack'):
self._pod_stack.close()
if hasattr(self, 'sse_logger') and self.sse_logger.is_alive():
requests.get(JINA_GLOBAL.logserver.shutdown)
self.sse_logger.join()
# if hasattr(self, 'sse_logger') and self.sse_logger.is_alive():
# self.sse_logger.stop()
self._build_level = FlowBuildLevel.EMPTY
# time.sleep(1) # sleep for a while until all resources are safely closed
self.logger.success(
Expand Down
7 changes: 5 additions & 2 deletions jina/logging/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ def start_sse_logger(server_config_path: str, flow_yaml: str = None):
'they are required for serving HTTP requests.'
'Please use "pip install jina[flask]" to install it.')

with open(server_config_path) as fp:
_config = yaml.load(fp)
try:
with open(server_config_path) as fp:
_config = yaml.load(fp)
except Exception as ex:
default_logger.error(ex)
JINA_GLOBAL.logserver.address = f'http://{_config["host"]}:{_config["port"]}'

JINA_GLOBAL.logserver.ready = JINA_GLOBAL.logserver.address + _config['endpoints']['ready']
Expand Down
6 changes: 4 additions & 2 deletions jina/peapods/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading

import grpc
from jina.logging.profile import TimeContext

from .grpc_asyncio import AsyncioExecutor
from .zmq import AsyncZmqlet, add_envelope
Expand Down Expand Up @@ -110,8 +111,9 @@ def prefetch_req(num_req, fetch_to):
return True
return False


is_req_empty = prefetch_req(self.args.prefetch, prefetch_task)
with TimeContext(f'prefetching {self.args.prefetch} requests', self.logger):
self.logger.info('if this takes too long, you may want to reduce "--prefetch"')
is_req_empty = prefetch_req(self.args.prefetch, prefetch_task)

while not (zmqlet.msg_sent == zmqlet.msg_recv != 0 and is_req_empty):
self.logger.info(f'send: {zmqlet.msg_sent} '
Expand Down
2 changes: 1 addition & 1 deletion jina/resources/logserver.default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ endpoints:
profile: /stream/profile # fetching profiling log in SSE stream
yaml: /data/yaml # get the YAML spec of a flow
shutdown: /action/shutdown # shutdown the log server
ready: /action/isready # tell if the log server is ready, return 200 if yes
ready: /status/ready # tell if the log server is ready, return 200 if yes


9 changes: 9 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import unittest

import requests
from jina import JINA_GLOBAL
from jina.enums import FlowOptimizeLevel
from jina.flow import Flow
from jina.main.checker import NetworkChecker
Expand Down Expand Up @@ -124,6 +126,13 @@ def test_flow_yaml_dump(self):
self.assertEqual(f.args.optimize_level, fl.args.optimize_level)
self.add_tmpfile('test1.yml')

def test_flow_log_server(self):
f = Flow().load_config('yaml/test_log_server.yml')
with f.build() as fl:
print(JINA_GLOBAL.logserver.ready)
a = requests.get(JINA_GLOBAL.logserver.ready, timeout=5)
self.assertEqual(a.status_code, 200)


if __name__ == '__main__':
unittest.main()
4 changes: 4 additions & 0 deletions tests/yaml/test_log_server.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
!Flow
with:
runtime: process
logserver: true

0 comments on commit 813738b

Please sign in to comment.