diff --git a/jina/executors/__init__.py b/jina/executors/__init__.py index 8a1eed4cd88ad..784845965b986 100644 --- a/jina/executors/__init__.py +++ b/jina/executors/__init__.py @@ -16,7 +16,6 @@ from ..excepts import EmptyExecutorYAML, BadWorkspace, BadPersistantFile, NoDriverForRequest, UnattachedDriver from ..helper import yaml, PathImporter, expand_dict, expand_env_var, valid_yaml_path from ..logging.base import get_logger -from ..logging.profile import profiling __all__ = ['BaseExecutor', 'AnyExecutor', 'ExecutorType'] @@ -66,8 +65,8 @@ def wrap_func(func_lst, wrapper): if cls.__name__ not in reg_cls_set: # print('reg class: %s' % cls.__name__) cls.__init__ = store_init_kwargs(cls.__init__) - if 'JINA_PROFILING' in os.environ: - wrap_func(prof_funcs, profiling) + # if 'JINA_PROFILING' in os.environ: + # wrap_func(prof_funcs, profiling) wrap_func(train_funcs, as_train_method) wrap_func(update_funcs, as_update_method) @@ -283,7 +282,6 @@ def touch(self): """Touch the executor and change ``is_updated`` to ``True`` so that one can call :func:`save`. """ self.is_updated = True - @profiling def save(self, filename: str = None) -> bool: """ Persist data of this executor to the :attr:`workspace` (or :attr:`replica_workspace`). The data could be @@ -332,7 +330,6 @@ def save(self, filename: str = None) -> bool: self.logger.success('artifacts of this executor (%s) is persisted to %s' % (self.name, f)) return True - @profiling def save_config(self, filename: str = None) -> bool: """ Serialize the object to a yaml file @@ -400,7 +397,6 @@ def load_config(cls: Type[AnyExecutor], filename: Union[str, TextIO], separated_ return yaml.load(tmp_s) @staticmethod - @profiling def load(filename: str = None) -> AnyExecutor: """Build an executor from a binary file diff --git a/jina/executors/encoders/nlp/transformer.py b/jina/executors/encoders/nlp/transformer.py index 02fca0a08cf41..2b4c5134a258b 100644 --- a/jina/executors/encoders/nlp/transformer.py +++ b/jina/executors/encoders/nlp/transformer.py @@ -3,7 +3,7 @@ import numpy as np from .. import BaseTextEncoder -from ..helper import reduce_mean, reduce_max, reduce_cls +from ..helper import reduce_mean, reduce_max class TransformerTextEncoder(BaseTextEncoder): diff --git a/jina/flow/__init__.py b/jina/flow/__init__.py index ebc1f5a07a4df..db836c3efdaad 100644 --- a/jina/flow/__init__.py +++ b/jina/flow/__init__.py @@ -2,11 +2,13 @@ import os import tempfile import threading +import time from collections import OrderedDict from contextlib import ExitStack from functools import wraps from typing import Union, Tuple, List, Set, Dict, Iterator, Callable, Type, TextIO, Any +import requests import ruamel.yaml from .. import __default_host__ @@ -424,12 +426,17 @@ def start(self): which is inherited all the way from :class:`jina.peapods.peas.BasePea` """ try: + import flask, flask_cors self.sse_logger = threading.Thread(name='sentinel-sse-logger', target=start_sse_logger, daemon=True, args=(self.host_sse, self.port_sse)) self.sse_logger.start() - except Exception as ex: - self.logger.error(f'sse logger can not start and being disabled because of the following error {ex}') + time.sleep(1) + + except ModuleNotFoundError: + self.logger.error( + f'sse logger can not start and being disabled because of flask and flask_cors are missing, ' + f'use "pip install jina[http]" to install the dependencies') self._pod_stack = ExitStack() for v in self._pod_nodes.values(): @@ -458,7 +465,6 @@ def close(self): if hasattr(self, '_pod_stack'): self._pod_stack.close() if hasattr(self, 'sse_logger') and self.sse_logger.is_alive(): - import requests requests.get(f'http://{self.host_sse}:{self.port_sse}/shutdown') self.sse_logger.join() self._build_level = FlowBuildLevel.EMPTY diff --git a/jina/logging/profile.py b/jina/logging/profile.py index d570ece22bcb2..8c272e5617510 100644 --- a/jina/logging/profile.py +++ b/jina/logging/profile.py @@ -1,4 +1,3 @@ -import os import time from collections import defaultdict from functools import wraps @@ -42,19 +41,15 @@ def foo(): @wraps(func) def arg_wrapper(*args, **kwargs): - if 'JINA_PROFILING' in os.environ: - start_t = time.perf_counter() - start_mem = used_memory(unit=1024 * 1024) - r = func(*args, **kwargs) - elapsed = time.perf_counter() - start_t - end_mem = used_memory(unit=1024 * 1024) - # level_prefix = ''.join('-' for v in inspect.stack() if v and v.index is not None and v.index >= 0) - level_prefix = '' - mem_status = 'memory: %4.3fMB -> %4.3fMB' % (start_mem, end_mem) - - default_logger.info('%s%s time: %3.3fs %s' % (level_prefix, func.__qualname__, elapsed, mem_status)) - else: - r = func(*args, **kwargs) + start_t = time.perf_counter() + start_mem = used_memory(unit=1024 * 1024) + r = func(*args, **kwargs) + elapsed = time.perf_counter() - start_t + end_mem = used_memory(unit=1024 * 1024) + # level_prefix = ''.join('-' for v in inspect.stack() if v and v.index is not None and v.index >= 0) + level_prefix = '' + mem_status = 'memory Δ %4.2fMB %4.2fMB -> %4.2fMB' % (end_mem - start_mem, start_mem, end_mem) + default_logger.info('%s%s time: %3.3fs %s' % (level_prefix, func.__qualname__, elapsed, mem_status)) return r return arg_wrapper diff --git a/jina/peapods/pea.py b/jina/peapods/pea.py index 26b0a7a3b96b2..4ccadf4ca3b84 100644 --- a/jina/peapods/pea.py +++ b/jina/peapods/pea.py @@ -203,10 +203,9 @@ def save_executor(self, dump_interval: int = 0): if self.executor.save(): self.logger.info('dumped changes to the executor, %3.0fs since last the save' % (time.perf_counter() - self.last_dump_time)) - self.last_dump_time = time.perf_counter() else: self.logger.info('executor says there is nothing to save') - + self.last_dump_time = time.perf_counter() # self.logger.info({'service': self.name, # 'profile': self._timer.accum_time, # 'timestamp_start': self._timer.start_time, diff --git a/jina/peapods/zmq.py b/jina/peapods/zmq.py index a4c22c10c43b7..e696b43878a33 100644 --- a/jina/peapods/zmq.py +++ b/jina/peapods/zmq.py @@ -46,6 +46,8 @@ def __init__(self, args: 'argparse.Namespace', logger: 'logging.Logger' = None): self.ctx, self.in_sock, self.out_sock, self.ctrl_sock = self.init_sockets() self.bytes_sent = 0 self.bytes_recv = 0 + self.msg_recv = 0 + self.msg_sent = 0 self.poller = zmq.Poller() self.poller.register(self.in_sock, zmq.POLLIN) self.poller.register(self.ctrl_sock, zmq.POLLIN) @@ -127,7 +129,12 @@ def close(self): """Close all sockets and shutdown the ZMQ context associated to this `Zmqlet`. """ self.close_sockets() self.ctx.term() - self.logger.info('bytes_sent: %.0f KB bytes_recv:%.0f KB' % (self.bytes_sent / 1024, self.bytes_recv / 1024)) + self.print_stats() + + def print_stats(self): + """Print out the network stats of of itself """ + self.logger.info('msg_sent: %d bytes_sent: %.0f KB msg_recv: %d bytes_recv:%.0f KB' % ( + self.msg_sent, self.bytes_sent / 1024, self.msg_recv, self.bytes_recv / 1024)) def send_message(self, msg: 'jina_pb2.Message'): """Send a message via the output socket @@ -149,6 +156,7 @@ def send_message(self, msg: 'jina_pb2.Message'): o_sock = self.out_sock self.bytes_sent += send_message(o_sock, msg, **self.send_recv_kwargs) + self.msg_sent += 1 def recv_message(self, callback: Callable[['jina_pb2.Message'], None] = None) -> 'jina_pb2.Message': """Receive a protobuf message from the input socket @@ -160,13 +168,16 @@ def recv_message(self, callback: Callable[['jina_pb2.Message'], None] = None) -> if i_sock is not None: msg, num_bytes = recv_message(i_sock, **self.send_recv_kwargs) self.bytes_recv += num_bytes + self.msg_recv += 1 if callback: return callback(msg) - def reset_bytes(self): + def clear_stats(self): """Reset the internal counter of send and receive bytes to zero. """ self.bytes_recv = 0 self.bytes_sent = 0 + self.msg_recv = 0 + self.msg_sent = 0 class AsyncZmqlet(Zmqlet):