Skip to content

Commit

Permalink
Merge pull request #229 from jina-ai/fix-flow-cli-226
Browse files Browse the repository at this point in the history
fix(cli): add cli to load directly from yaml
  • Loading branch information
hanxiao authored Apr 9, 2020
2 parents b3d0258 + 7fd419a commit f69d82b
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 48 deletions.
11 changes: 11 additions & 0 deletions docs/chapters/envs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ Here is the list of environment variables that ``jina`` respects during runtime.

:default: ``INFO``

.. confval:: JINA_LOG_SSE

Turn on the server side event logging for all pea, pods, executors running in the context.

:default: unset

.. confval:: JINA_LOG_PROFILING

Turn on the server side event logging for profiling

:default: unset

.. confval:: JINA_LOG_LONG

Expand Down
12 changes: 5 additions & 7 deletions jina/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,8 @@ def save(self, filename: str = None) -> bool:
:return: successfully persisted or not
"""
if not self.is_updated:
self.logger.info('no update since %s, will not save. '
'If you really want to save it, call "touch()" before "save()" to force saving'
% self._last_snapshot_ts)
self.logger.info(f'no update since {self._last_snapshot_ts:%Y-%m-%d %H:%M:%S%z}, will not save. '
'If you really want to save it, call "touch()" before "save()" to force saving')
return False

self.is_updated = False
Expand Down Expand Up @@ -376,10 +375,9 @@ def load_config(cls: Type[AnyExecutor], filename: Union[str, TextIO], separated_
mod = tmp['metas']['py_modules']

if isinstance(mod, str):
if not os.path.isabs(mod):
mod = os.path.join(os.path.dirname(filename), mod)
PathImporter.add_modules(mod)
elif isinstance(mod, list):
mod = [mod]

if isinstance(mod, list):
mod = [m if os.path.isabs(m) else os.path.join(os.path.dirname(filename), m) for m in mod]
PathImporter.add_modules(*mod)
else:
Expand Down
21 changes: 13 additions & 8 deletions jina/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,28 @@ def __init__(self, args: 'argparse.Namespace' = None, **kwargs):
as the head and tail routers are removed.
"""
self.logger = get_logger(self.__class__.__name__)
self._pod_nodes = OrderedDict() # type: Dict[str, 'FlowPod']
self._build_level = FlowBuildLevel.EMPTY
self._pod_name_counter = 0
self._last_changed_pod = []

self.update_args(args, **kwargs)

if not self.args.no_gateway:
self._add_gateway()

def update_args(self, args, **kwargs):
from ..main.parser import set_flow_parser
_flow_parser = set_flow_parser()
if args is None:
from ..helper import get_parsed_args
_, args, _ = get_parsed_args(kwargs, _flow_parser)

self.args = args
if self.args.logserver and 'log_sse' not in kwargs:
if kwargs and self.args.logserver and 'log_sse' not in kwargs:
kwargs['log_sse'] = True
self._common_kwargs = kwargs
self._pod_nodes = OrderedDict() # type: Dict[str, 'FlowPod']
self._build_level = FlowBuildLevel.EMPTY
self._pod_name_counter = 0
self._last_changed_pod = []
if not self.args.no_gateway:
self._add_gateway()
self._kwargs = get_non_defaults_args(args, _flow_parser)
self._kwargs = get_non_defaults_args(args, _flow_parser) #: for yaml dump

@classmethod
def to_yaml(cls, representer, data):
Expand Down
11 changes: 0 additions & 11 deletions jina/flow/cli.py

This file was deleted.

10 changes: 7 additions & 3 deletions jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,10 @@ def get_parsed_args(kwargs, parser):
return args, p_args, unknown_args


def get_non_defaults_args(args, parser) -> Dict:
_defaults = parser.parse_args([])
return {k: v for k, v in vars(args).items() if getattr(_defaults, k) != v}
def get_non_defaults_args(args, parser, taboo=(None,)) -> Dict:
non_defaults = {}
_defaults = vars(parser.parse_args([]))
for k, v in vars(args).items():
if k in _defaults and k not in taboo and _defaults[k] != v:
non_defaults[k] = v
return non_defaults
4 changes: 2 additions & 2 deletions jina/logging/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def get_logger(context: str, context_len: int = 15,
h.setFormatter(ColorFormatter(fmt_str))
logger.addHandler(h)

if log_profile:
if ('JINA_LOG_PROFILING' in os.environ) or log_profile:
h = logging.FileHandler('jina-profile-%s.json' % __uptime__, delay=True)
h.setLevel(verbose_level.value)
h.setFormatter(ProfileFormatter(timed_fmt_str))
Expand All @@ -204,7 +204,7 @@ def get_logger(context: str, context_len: int = 15,
h.setFormatter(JsonFormatter(timed_fmt_str))
logger.addHandler(h)

if log_sse:
if ('JINA_LOG_SSE' in os.environ) or log_sse:
h = QueueHandler(__sse_queue__)
h.setLevel(verbose_level.value)
h.setFormatter(JsonFormatter(timed_fmt_str))
Expand Down
17 changes: 15 additions & 2 deletions jina/main/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,18 @@ def client(args):


def flow(args):
from ..flow.cli import FlowCLI
FlowCLI(args)
"""Start a Flow from a YAML file"""
from ..flow import Flow
if args.yaml_path:
from threading import Event
f = Flow.load_config(args.yaml_path)
f.update_args(args)
with f.build():

try:
Event().wait()
except KeyboardInterrupt:
pass
else:
from jina.logging import default_logger
default_logger.critical('start a flow from CLI requires a valid "--yaml-path"')
7 changes: 6 additions & 1 deletion jina/main/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def set_base_parser():
'Code': ('🧑‍💻', 'https://github.com/jina-ai/jina/'),
'Jina Hub': ('📦', 'https://github.com/jina-ai/jina-hub/'),
'Home': ('🌐', 'https://jina.ai'),
'Join Us': ('🙌', 'hr@jina.ai')
'Hiring': ('🙌', 'hr@jina.ai')
}
url_str = '\n'.join(f'{v[0]} {k:10.10} {colored(v[1], "cyan", attrs=["underline"])}' for k, v in urls.items())

Expand Down Expand Up @@ -116,6 +116,9 @@ def set_pea_parser(parser=None):
'> a supported executor\'s class name, '
'> one of "_clear", "_route", "_forward", "_logroute", "_merge" '
'> the content of YAML config (must starts with "!")') # pod(no use) -> pea
gp0.add_argument('--py-modules', type=str, nargs='*',
help='the customized python modules need to be imported before loading the'
' executor')

gp1 = add_arg_group(parser, 'pea container arguments')
gp1.add_argument('--image', type=str,
Expand Down Expand Up @@ -201,6 +204,8 @@ def set_pea_parser(parser=None):
help='turn on remote logging')
gp7.add_argument('--log-profile', action='store_true', default=False,
help='turn on the profiling logger')
gp7.add_argument('--override-exec-log', action='store_true', default=False,
help='turn on to allow the override of the executor logger by the pea logger')
_set_grpc_parser(parser)
return parser

Expand Down
13 changes: 4 additions & 9 deletions jina/peapods/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .pea import BasePea
from .. import __ready_msg__
from ..helper import valid_yaml_path, kwargs2list
from ..helper import valid_yaml_path, kwargs2list, get_non_defaults_args
from ..logging import get_logger


Expand All @@ -17,17 +17,12 @@ def post_init(self):
import docker
self._client = docker.from_env()

from ..main.parser import set_pea_parser
_defaults = vars(set_pea_parser().parse_args([]))
non_defaults = {}
# the image arg should be ignored otherwise it keeps using ContainerPea in the container
# basically all args in BasePea-docker arg group should be ignored.
# this prevent setting containerPea twice
taboo = {'image', 'entrypoint', 'volumes', 'pull_latest'}

for k, v in vars(self.args).items():
if k in _defaults and k not in taboo and _defaults[k] != v:
non_defaults[k] = v
from ..main.parser import set_pea_parser
non_defaults = get_non_defaults_args(self.args, set_pea_parser(),
taboo={'image', 'entrypoint', 'volumes', 'pull_latest'})

if self.args.pull_latest:
self._client.images.pull(self.args.image)
Expand Down
8 changes: 8 additions & 0 deletions jina/peapods/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ def load_executor(self):
try:
self.executor = BaseExecutor.load_config(self.args.yaml_path,
self.args.separated_workspace, self.args.replica_id)
if self.args.override_exec_log:
self.executor.logger = self.logger
self.executor.attach(pea=self)
# self.logger = get_logger('%s(%s)' % (self.name, self.executor.name), **vars(self.args))
except FileNotFoundError:
Expand Down Expand Up @@ -296,6 +298,7 @@ def msg_callback(self, msg: 'jina_pb2.Message') -> Optional['jina_pb2.Message']:

def loop_body(self):
"""The body of the request loop """
self.load_plugins()
self.load_executor()
self.zmqlet = Zmqlet(self.args, logger=self.logger)
self.set_ready()
Expand All @@ -313,6 +316,11 @@ def loop_body(self):
# t_loop_end = time.perf_counter()
# self.logger.info(f'handle {(t_callback - t_loop_start) / (t_loop_end - t_loop_start):2.2f}')

def load_plugins(self):
if self.args.py_modules:
from ..helper import PathImporter
PathImporter.add_modules(*self.args.py_modules)

def loop_teardown(self):
"""Stop the request loop """
if hasattr(self, 'executor'):
Expand Down
11 changes: 6 additions & 5 deletions jina/peapods/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,8 @@ def __init__(self, args: Union['argparse.Namespace', Dict]):
self.is_tail_router = False
self.deducted_head = None
self.deducted_tail = None
self._args = args
self.peas_args = self._parse_args(args)
self.sentinel_threads = []
if isinstance(args, argparse.Namespace) and getattr(args, 'shutdown_idle', False):
self.sentinel_threads.append(Thread(target=self.close_if_idle,
name='sentinel-shutdown-idle',
daemon=True))

@property
def is_idle(self) -> bool:
Expand Down Expand Up @@ -163,6 +159,11 @@ def set_runtime(self, runtime: str):
# s.host_out = __default_host__

def start_sentinels(self):
self.sentinel_threads = []
if isinstance(self._args, argparse.Namespace) and getattr(self._args, 'shutdown_idle', False):
self.sentinel_threads.append(Thread(target=self.close_if_idle,
name='sentinel-shutdown-idle',
daemon=True))
for t in self.sentinel_threads:
t.start()

Expand Down
12 changes: 12 additions & 0 deletions tests/yaml/slow-flow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
!Flow
with:
runtime: process
logserver: True
logserver_config: test-server-config.yml
max_idle_time: 5
shutdown_idle: true
pods:
sw:
yaml_path: SlowWorker
replicas: 5
py_modules: ./yaml/slow-worker.py
19 changes: 19 additions & 0 deletions tests/yaml/slow-worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os
import time

from jina.executors.crafters import BaseDocCrafter


class SlowWorker(BaseDocCrafter):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# half of worker is slow
self.is_slow = os.getpid() % 10 != 0
self.logger.warning('im a slow worker')

def craft(self, doc_id, *args, **kwargs):
if self.is_slow:
self.logger.warning('slowly doing')
time.sleep(2)
return {'doc_id': doc_id}

0 comments on commit f69d82b

Please sign in to comment.