Skip to content

Commit

Permalink
fix(pea): gracefully exit thread and process
Browse files Browse the repository at this point in the history
Signed-off-by: Han Xiao <han.xiao@jina.ai>
  • Loading branch information
hanxiao committed Apr 6, 2020
1 parent 85bc8c3 commit f7c6491
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
44 changes: 42 additions & 2 deletions jina/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,37 @@ def _get_event(obj):
raise NotImplementedError


def _make_or_event(obj, *events):
or_event = _get_event(obj)

def or_set(self):
self._set()
self.changed()

def or_clear(self):
self._clear()
self.changed()

def orify(e, changed_callback):
e._set = e.set
e._clear = e.clear
e.changed = changed_callback
e.set = lambda: or_set(e)
e.clear = lambda: or_clear(e)

def changed():
bools = [e.is_set() for e in events]
if any(bools):
or_event.set()
else:
or_event.clear()

for e in events:
orify(e, changed)
changed()
return or_event


class BasePea(metaclass=PeaMeta):
"""BasePea is an unary service unit which provides network interface and
communicates with others via protobuf and ZeroMQ
Expand All @@ -74,8 +105,13 @@ def __init__(self, args: 'argparse.Namespace'):
super().__init__()
self.args = args
self.name = self.__class__.__name__
self.daemon = True

self.is_ready = _get_event(self)
self.is_shutdown = _get_event(self)
self.ready_or_shutdown = _make_or_event(self, self.is_ready, self.is_shutdown)
self.is_shutdown.clear()

# self.is_busy = _get_event(self)
# # label the pea as busy until the loop body start
# self.is_busy.set()
Expand Down Expand Up @@ -318,6 +354,7 @@ def run(self):
finally:
self.loop_teardown()
self.unset_ready()
self.is_shutdown.set()

def check_memory_watermark(self):
"""Check the memory watermark """
Expand Down Expand Up @@ -347,10 +384,13 @@ def status(self):
def start(self):
super().start()
_timeout = getattr(self.args, 'timeout_ready', 5e3) / 1e3
if self.is_ready.wait(_timeout):
if self.ready_or_shutdown.wait(_timeout):
if self.is_shutdown.is_set():
self.logger.critical(f'fail to start {self.__class__} with name {self.name}')
return self
else:
raise TimeoutError('this BasePea (name=%s) can not be initialized after %dms' % (self.name, _timeout * 1e3))
raise TimeoutError(
f'{self.__class__} with name {self.name} can not be initialized after {_timeout * 1e3}ms')

def __enter__(self):
return self.start()
Expand Down
5 changes: 3 additions & 2 deletions jina/peapods/zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ def __init__(self, args: 'argparse.Namespace', logger: 'logging.Logger' = None):

self.ctrl_addr, self.ctrl_with_ipc = self.get_ctrl_address(args)
self.opened_socks = []
self.ctx, self.in_sock, self.out_sock, self.ctrl_sock = self.init_sockets()
self.bytes_sent = 0
self.bytes_recv = 0
self.msg_recv = 0
self.msg_sent = 0
self.ctx, self.in_sock, self.out_sock, self.ctrl_sock = self.init_sockets()
self.poller = zmq.Poller()
self.poller.register(self.in_sock, zmq.POLLIN)
self.poller.register(self.ctrl_sock, zmq.POLLIN)
Expand Down Expand Up @@ -152,7 +152,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def close(self):
"""Close all sockets and shutdown the ZMQ context associated to this `Zmqlet`. """
self.close_sockets()
self.ctx.term()
if hasattr(self, 'ctx'):
self.ctx.term()
self.print_stats()

def print_stats(self):
Expand Down
13 changes: 13 additions & 0 deletions tests/test_peapods.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ def _test_pea_context(runtime):
with self.subTest(runtime=j):
_test_pea_context(j)

def test_address_in_use(self):
args1 = set_pea_parser().parse_args(['--port_ctrl', '55555'])
args2 = set_pea_parser().parse_args(['--port_ctrl', '55555'])
with BasePea(args1), BasePea(args2):
pass

args1 = set_pea_parser().parse_args(['--port_ctrl', '55555', '--runtime', 'thread'])
args2 = set_pea_parser().parse_args(['--port_ctrl', '55555', '--runtime', 'thread'])
with BasePea(args1), BasePea(args2):
pass

print('everything should quit gracefully')

def test_pod_context(self):
def _test_pod_context(runtime):
args = set_pod_parser().parse_args(['--runtime', runtime, '--replicas', '2'])
Expand Down

0 comments on commit f7c6491

Please sign in to comment.