Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

feat(service): remove async dump for better stability #314

Merged
merged 5 commits into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ def set_service_parser(parser=None):
parser.add_argument('--timeout', type=int, default=-1,
help='timeout (ms) of all communication, -1 for waiting forever')
parser.add_argument('--dump_interval', type=int, default=5,
help='serialize the service to a file every n seconds, -1 means --read_only')
help='serialize the model in the service every n seconds if model changes. '
'-1 means --read_only. ')
parser.add_argument('--read_only', action='store_true', default=False,
help='do not allow the service to modify the model, '
'dump_interval will be ignored')
Expand Down
11 changes: 7 additions & 4 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ class Flow:

You can use `.add()` then `.build()` to customize your own workflow.
For example:

.. highlight:: python
.. code-block:: python

f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Preprocessor, yaml_path='BasePreprocessor')
.add(gfs.Encoder, yaml_path='BaseEncoder')
.add(gfs.Router, yaml_path='BaseRouter'))

with f.build(backend='thread') as flow:
flow.index()
...
Expand All @@ -77,6 +79,7 @@ class Flow:
Note the different default copy behaviors in `.add()` and `.build()`:
`.add()` always copy the flow by default, whereas `.build()` modify the flow in place.
You can change this behavior by giving an argument `copy_flow=False`.

"""
_supported_orch = {'swarm', 'k8s'}
_service2parser = {
Expand Down Expand Up @@ -211,14 +214,14 @@ def add(self, service: 'Service',
**kwargs) -> 'Flow':
"""
Add a service to the current flow object and return the new modified flow object
:param copy_flow: when set to true, then always copy the current flow
and do the modification on top of it then return, otherwise, do in-line modification

:param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
:param name: the name indentifier of the service, useful in 'service_in' and 'service_out'
:param service_in: the name of the service(s) that this service receives data from.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param service_out: the name of the service(s) that this service sends data to.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
:param kwargs: other keyword-value arguments that the service CLI supports
:return: a (new) flow object with modification
"""
Expand Down
55 changes: 26 additions & 29 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ def __init__(self, args):
self.is_event_loop = self._get_event()
self.is_model_changed = self._get_event()
self.is_handler_done = self._get_event()
self.last_dump_time = time.perf_counter()
self._model = None
self.use_event_loop = True
self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl)
Expand All @@ -335,29 +336,20 @@ def run(self):
except Exception as ex:
self.logger.error(ex, exc_info=True)

def _start_auto_dump(self):
if self.args.dump_interval > 0 and not self.args.read_only:
self._auto_dump_thread = threading.Thread(target=self._auto_dump)
self._auto_dump_thread.setDaemon(1)
self._auto_dump_thread.start()

def _auto_dump(self):
while self.is_event_loop.is_set():
if self.is_model_changed.is_set():
self.is_model_changed.clear()
self.logger.info(
'auto-dumping the new change of the model every %ds...' % self.args.dump_interval)
self.dump()
time.sleep(self.args.dump_interval)

def dump(self):
if not self.args.read_only:
if self._model:
self.logger.info('dumping changes to the model...')
self._model.dump()
self.logger.info('dumping finished!')
else:
self.logger.info('no dumping as "read_only" set to true.')
def dump(self, respect_dump_interval: bool = True):
if (not self.args.read_only
and self.args.dump_interval > 0
and self._model
and self.is_model_changed.is_set()
and (respect_dump_interval
and (time.perf_counter() - self.last_dump_time) > self.args.dump_interval)
or not respect_dump_interval):
self.is_model_changed.clear()
self.logger.info('dumping changes to the model, %3.0fs since last the dump'
% (time.perf_counter() - self.last_dump_time))
self._model.dump()
self.last_dump_time = time.perf_counter()
self.logger.info('dumping finished! next dump will start in at least %3.0fs' % self.args.dump_interval)

@handler.register_hook(hook_type='post')
def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', *args, **kwargs):
Expand Down Expand Up @@ -414,17 +406,17 @@ def _run(self, ctx):
self.post_init()
self.is_ready.set()
self.is_event_loop.set()
self._start_auto_dump()
self.logger.critical('ready and listening')
while self.is_event_loop.is_set():
pull_sock = None
socks = dict(poller.poll())
socks = dict(poller.poll(1))
if socks.get(in_sock) == zmq.POLLIN:
pull_sock = in_sock
elif socks.get(ctrl_sock) == zmq.POLLIN:
pull_sock = ctrl_sock
else:
self.logger.error('received message from unknown socket: %s' % socks)
# no message received, pass
continue

if self.use_event_loop or pull_sock == ctrl_sock:
with TimeContext('handling message', self.logger):
self.is_handler_done.clear()
Expand All @@ -450,10 +442,13 @@ def _run(self, ctx):
self.logger.warning(
'received a new message but since "use_event_loop=False" I will not handle it. '
'I will just block the thread until "is_handler_done" is set!')
# wait until some one else call is_handler_done.set()
self.is_handler_done.wait()
# clear the handler status
self.is_handler_done.clear()
if self.args.dump_interval == 0:
self.dump()

# block the event loop if a dump is needed
self.dump()
except EventLoopEnd:
self.logger.info('break from the event loop')
except ComponentNotLoad:
Expand All @@ -466,6 +461,8 @@ def _run(self, ctx):
in_sock.close()
out_sock.close()
ctrl_sock.close()
# do not check dump_interval constraint as the last dump before close
self.dump(respect_dump_interval=False)
self.logger.critical('terminated')

def post_init(self):
Expand Down
5 changes: 0 additions & 5 deletions gnes/service/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ def get_response(num_recv, blocked=False):
zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs)
self.pending_request += 1

num_recv = max(self.pending_request - self.args.max_pending_request, 1)

# switch to blocked recv when too many pending requests
yield from get_response(num_recv, num_recv > 1)

yield from get_response(self.pending_request, blocked=True)

class ZmqContext:
Expand Down
6 changes: 0 additions & 6 deletions tests/test_service_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ def test_external_module(self):

with ServiceManager(RouterService, args):
pass
self.assertTrue(os.path.exists('foo_contrib_encoder.bin'))
os.remove('foo_contrib_encoder.bin')

def test_override_module(self):
args = set_indexer_parser().parse_args([
Expand All @@ -108,8 +106,6 @@ def test_override_module(self):

with ServiceManager(IndexerService, args):
pass
self.assertTrue(os.path.exists('foo_contrib_encoder.bin'))
os.remove('foo_contrib_encoder.bin')

def test_override_twice_module(self):
args = set_indexer_parser().parse_args([
Expand All @@ -120,8 +116,6 @@ def test_override_twice_module(self):

with ServiceManager(IndexerService, args):
pass
self.assertTrue(os.path.exists('foo_contrib_encoder.bin'))
os.remove('foo_contrib_encoder.bin')

def test_grpc_with_pub(self):
self._test_grpc_multiple_pub('thread', 1)
Expand Down