Skip to content

Commit

Permalink
Merge pull request #180 from jina-ai/fix-sse-thread
Browse files Browse the repository at this point in the history
fix: sse logger threading exception
  • Loading branch information
hanxiao authored Apr 2, 2020
2 parents 4701c2d + ee290c1 commit f2e1b25
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 28 deletions.
8 changes: 2 additions & 6 deletions jina/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ..excepts import EmptyExecutorYAML, BadWorkspace, BadPersistantFile, NoDriverForRequest, UnattachedDriver
from ..helper import yaml, PathImporter, expand_dict, expand_env_var, valid_yaml_path
from ..logging.base import get_logger
from ..logging.profile import profiling

__all__ = ['BaseExecutor', 'AnyExecutor', 'ExecutorType']

Expand Down Expand Up @@ -66,8 +65,8 @@ def wrap_func(func_lst, wrapper):
if cls.__name__ not in reg_cls_set:
# print('reg class: %s' % cls.__name__)
cls.__init__ = store_init_kwargs(cls.__init__)
if 'JINA_PROFILING' in os.environ:
wrap_func(prof_funcs, profiling)
# if 'JINA_PROFILING' in os.environ:
# wrap_func(prof_funcs, profiling)

wrap_func(train_funcs, as_train_method)
wrap_func(update_funcs, as_update_method)
Expand Down Expand Up @@ -283,7 +282,6 @@ def touch(self):
"""Touch the executor and change ``is_updated`` to ``True`` so that one can call :func:`save`. """
self.is_updated = True

@profiling
def save(self, filename: str = None) -> bool:
"""
Persist data of this executor to the :attr:`workspace` (or :attr:`replica_workspace`). The data could be
Expand Down Expand Up @@ -332,7 +330,6 @@ def save(self, filename: str = None) -> bool:
self.logger.success('artifacts of this executor (%s) is persisted to %s' % (self.name, f))
return True

@profiling
def save_config(self, filename: str = None) -> bool:
"""
Serialize the object to a yaml file
Expand Down Expand Up @@ -400,7 +397,6 @@ def load_config(cls: Type[AnyExecutor], filename: Union[str, TextIO], separated_
return yaml.load(tmp_s)

@staticmethod
@profiling
def load(filename: str = None) -> AnyExecutor:
"""Build an executor from a binary file
Expand Down
2 changes: 1 addition & 1 deletion jina/executors/encoders/nlp/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np

from .. import BaseTextEncoder
from ..helper import reduce_mean, reduce_max, reduce_cls
from ..helper import reduce_mean, reduce_max


class TransformerTextEncoder(BaseTextEncoder):
Expand Down
12 changes: 9 additions & 3 deletions jina/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import os
import tempfile
import threading
import time
from collections import OrderedDict
from contextlib import ExitStack
from functools import wraps
from typing import Union, Tuple, List, Set, Dict, Iterator, Callable, Type, TextIO, Any

import requests
import ruamel.yaml

from .. import __default_host__
Expand Down Expand Up @@ -424,12 +426,17 @@ def start(self):
which is inherited all the way from :class:`jina.peapods.peas.BasePea`
"""
try:
import flask, flask_cors
self.sse_logger = threading.Thread(name='sentinel-sse-logger',
target=start_sse_logger, daemon=True,
args=(self.host_sse, self.port_sse))
self.sse_logger.start()
except Exception as ex:
self.logger.error(f'sse logger can not start and being disabled because of the following error {ex}')
time.sleep(1)

except ModuleNotFoundError:
self.logger.error(
f'sse logger can not start and being disabled because of flask and flask_cors are missing, '
f'use "pip install jina[http]" to install the dependencies')

self._pod_stack = ExitStack()
for v in self._pod_nodes.values():
Expand Down Expand Up @@ -458,7 +465,6 @@ def close(self):
if hasattr(self, '_pod_stack'):
self._pod_stack.close()
if hasattr(self, 'sse_logger') and self.sse_logger.is_alive():
import requests
requests.get(f'http://{self.host_sse}:{self.port_sse}/shutdown')
self.sse_logger.join()
self._build_level = FlowBuildLevel.EMPTY
Expand Down
23 changes: 9 additions & 14 deletions jina/logging/profile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import time
from collections import defaultdict
from functools import wraps
Expand Down Expand Up @@ -42,19 +41,15 @@ def foo():

@wraps(func)
def arg_wrapper(*args, **kwargs):
if 'JINA_PROFILING' in os.environ:
start_t = time.perf_counter()
start_mem = used_memory(unit=1024 * 1024)
r = func(*args, **kwargs)
elapsed = time.perf_counter() - start_t
end_mem = used_memory(unit=1024 * 1024)
# level_prefix = ''.join('-' for v in inspect.stack() if v and v.index is not None and v.index >= 0)
level_prefix = ''
mem_status = 'memory: %4.3fMB -> %4.3fMB' % (start_mem, end_mem)

default_logger.info('%s%s time: %3.3fs %s' % (level_prefix, func.__qualname__, elapsed, mem_status))
else:
r = func(*args, **kwargs)
start_t = time.perf_counter()
start_mem = used_memory(unit=1024 * 1024)
r = func(*args, **kwargs)
elapsed = time.perf_counter() - start_t
end_mem = used_memory(unit=1024 * 1024)
# level_prefix = ''.join('-' for v in inspect.stack() if v and v.index is not None and v.index >= 0)
level_prefix = ''
mem_status = 'memory Δ %4.2fMB %4.2fMB -> %4.2fMB' % (end_mem - start_mem, start_mem, end_mem)
default_logger.info('%s%s time: %3.3fs %s' % (level_prefix, func.__qualname__, elapsed, mem_status))
return r

return arg_wrapper
Expand Down
3 changes: 1 addition & 2 deletions jina/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,9 @@ def save_executor(self, dump_interval: int = 0):
if self.executor.save():
self.logger.info('dumped changes to the executor, %3.0fs since last the save'
% (time.perf_counter() - self.last_dump_time))
self.last_dump_time = time.perf_counter()
else:
self.logger.info('executor says there is nothing to save')

self.last_dump_time = time.perf_counter()
# self.logger.info({'service': self.name,
# 'profile': self._timer.accum_time,
# 'timestamp_start': self._timer.start_time,
Expand Down
15 changes: 13 additions & 2 deletions jina/peapods/zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def __init__(self, args: 'argparse.Namespace', logger: 'logging.Logger' = None):
self.ctx, self.in_sock, self.out_sock, self.ctrl_sock = self.init_sockets()
self.bytes_sent = 0
self.bytes_recv = 0
self.msg_recv = 0
self.msg_sent = 0
self.poller = zmq.Poller()
self.poller.register(self.in_sock, zmq.POLLIN)
self.poller.register(self.ctrl_sock, zmq.POLLIN)
Expand Down Expand Up @@ -127,7 +129,12 @@ def close(self):
"""Close all sockets and shutdown the ZMQ context associated to this `Zmqlet`. """
self.close_sockets()
self.ctx.term()
self.logger.info('bytes_sent: %.0f KB bytes_recv:%.0f KB' % (self.bytes_sent / 1024, self.bytes_recv / 1024))
self.print_stats()

def print_stats(self):
"""Print out the network stats of of itself """
self.logger.info('msg_sent: %d bytes_sent: %.0f KB msg_recv: %d bytes_recv:%.0f KB' % (
self.msg_sent, self.bytes_sent / 1024, self.msg_recv, self.bytes_recv / 1024))

def send_message(self, msg: 'jina_pb2.Message'):
"""Send a message via the output socket
Expand All @@ -149,6 +156,7 @@ def send_message(self, msg: 'jina_pb2.Message'):
o_sock = self.out_sock

self.bytes_sent += send_message(o_sock, msg, **self.send_recv_kwargs)
self.msg_sent += 1

def recv_message(self, callback: Callable[['jina_pb2.Message'], None] = None) -> 'jina_pb2.Message':
"""Receive a protobuf message from the input socket
Expand All @@ -160,13 +168,16 @@ def recv_message(self, callback: Callable[['jina_pb2.Message'], None] = None) ->
if i_sock is not None:
msg, num_bytes = recv_message(i_sock, **self.send_recv_kwargs)
self.bytes_recv += num_bytes
self.msg_recv += 1
if callback:
return callback(msg)

def reset_bytes(self):
def clear_stats(self):
"""Reset the internal counter of send and receive bytes to zero. """
self.bytes_recv = 0
self.bytes_sent = 0
self.msg_recv = 0
self.msg_sent = 0


class AsyncZmqlet(Zmqlet):
Expand Down

0 comments on commit f2e1b25

Please sign in to comment.